CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-api-java-bridge

Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.

Pending
Overview
Eval results
Files

procedures.mddocs/

Procedures

Context for stored procedure execution with access to StreamExecutionEnvironment. Procedures enable encapsulation of complex data processing logic that can be called from SQL and integrated with the streaming execution environment.

Capabilities

Procedure Context Interface

Core interface providing context for stored procedure execution.

/**
 * A context to provide necessary context used by stored procedure
 * Provides access to execution environment for procedure implementation
 */
public interface ProcedureContext {
    
    /**
     * Return the StreamExecutionEnvironment where the procedure is called
     * Flink creates a new StreamExecutionEnvironment based on current configuration
     * and passes it to the procedure for every procedure call
     * The procedure can modify the passed StreamExecutionEnvironment safely
     * as it won't be leaked outside
     * @return StreamExecutionEnvironment for procedure execution
     */
    StreamExecutionEnvironment getExecutionEnvironment();
}

Default Procedure Context

Default implementation of ProcedureContext for standard procedure execution.

/**
 * Default implementation of ProcedureContext
 * Provides standard procedure execution context
 */
public class DefaultProcedureContext implements ProcedureContext {
    
    /**
     * Get the StreamExecutionEnvironment for this procedure context
     * @return StreamExecutionEnvironment instance configured for procedure execution
     */
    @Override
    public StreamExecutionEnvironment getExecutionEnvironment();
}

Usage Examples:

import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedure.DefaultProcedureContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

// Custom procedure implementation
public class DataProcessingProcedure {
    
    public void processData(ProcedureContext context, String inputPath, String outputPath) {
        // Get execution environment from context
        StreamExecutionEnvironment env = context.getExecutionEnvironment();
        
        // Procedure can safely modify the environment
        env.setParallelism(4);
        env.enableCheckpointing(60000); // 1-minute checkpoints
        
        // Implement data processing logic
        DataStream<String> inputStream = env.readTextFile(inputPath);
        DataStream<String> processedStream = inputStream
            .map(line -> line.toUpperCase())
            .filter(line -> line.length() > 10);
        
        // Write results
        processedStream.writeAsText(outputPath);
        
        // Environment execution will be handled by the procedure framework
    }
}

// Using default context
ProcedureContext context = new DefaultProcedureContext();
DataProcessingProcedure procedure = new DataProcessingProcedure();
procedure.processData(context, "/input/data.txt", "/output/processed.txt");

Advanced Procedure Patterns

Stateful Procedure Implementation

Implement procedures that maintain state across invocations.

public class StatefulAnalyticsProcedure {
    private final ValueStateDescriptor<Long> counterDescriptor;
    
    public StatefulAnalyticsProcedure() {
        this.counterDescriptor = new ValueStateDescriptor<>("procedure-counter", Long.class);
    }
    
    public TableResult analyzeWithState(
            ProcedureContext context,
            String tableName,
            String outputTable) throws Exception {
        
        StreamExecutionEnvironment env = context.getExecutionEnvironment();
        
        // Configure environment for stateful processing
        env.enableCheckpointing(30000);
        env.setStateBackend(new HashMapStateBackend());
        
        // Create stateful processing stream
        DataStream<Row> analysisStream = env
            .fromSource(createTableSource(tableName), WatermarkStrategy.noWatermarks(), "analysis-source")
            .keyBy(row -> row.getField(0))
            .process(new KeyedProcessFunction<Object, Row, Row>() {
                private ValueState<Long> counter;
                
                @Override
                public void open(Configuration parameters) throws Exception {
                    counter = getRuntimeContext().getState(counterDescriptor);
                }
                
                @Override
                public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {
                    Long currentCount = counter.value();
                    if (currentCount == null) {
                        currentCount = 0L;
                    }
                    counter.update(currentCount + 1);
                    
                    // Emit enriched row with count
                    Row enrichedRow = Row.of(row.getField(0), row.getField(1), currentCount + 1);
                    out.collect(enrichedRow);
                }
            });
        
        // Sink to output table
        analysisStream.addSink(createTableSink(outputTable));
        
        return null; // Return appropriate TableResult
    }
    
    private SourceFunction<Row> createTableSource(String tableName) {
        // Implement table source creation
        return null;
    }
    
    private SinkFunction<Row> createTableSink(String tableName) {
        // Implement table sink creation
        return null;
    }
}

Procedure with Custom Configuration

Create procedures that accept configuration parameters and customize execution.

public class ConfigurableProcedure {
    
    public void processWithConfig(
            ProcedureContext context,
            Map<String, String> config,
            String inputTable,
            String outputTable) {
        
        StreamExecutionEnvironment env = context.getExecutionEnvironment();
        
        // Apply configuration from parameters
        int parallelism = Integer.parseInt(config.getOrDefault("parallelism", "4"));
        long checkpointInterval = Long.parseLong(config.getOrDefault("checkpoint.interval", "60000"));
        String processingMode = config.getOrDefault("processing.mode", "event-time");
        
        env.setParallelism(parallelism);
        env.enableCheckpointing(checkpointInterval);
        
        // Configure based on processing mode
        if ("event-time".equals(processingMode)) {
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        } else {
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        }
        
        // Implement processing logic based on configuration
        DataStream<Row> inputStream = createInputStream(env, inputTable);
        DataStream<Row> processedStream;
        
        if ("aggregation".equals(config.get("operation.type"))) {
            processedStream = inputStream
                .keyBy(row -> row.getField(0))
                .timeWindow(Time.minutes(5))
                .aggregate(new CustomAggregateFunction());
        } else {
            processedStream = inputStream
                .map(new CustomMapFunction(config));
        }
        
        processedStream.addSink(createOutputStream(outputTable));
    }
    
    private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String tableName) {
        // Implementation for creating input stream from table
        return null;
    }
    
    private SinkFunction<Row> createOutputStream(String tableName) {
        // Implementation for creating output sink to table
        return null;
    }
}

Async Procedure Execution

Implement procedures that handle asynchronous operations.

public class AsyncProcedure {
    
    public CompletableFuture<TableResult> processAsync(
            ProcedureContext context,
            String inputPath,
            String outputPath) {
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                StreamExecutionEnvironment env = context.getExecutionEnvironment();
                
                // Configure for async execution
                env.setParallelism(8);
                env.setBufferTimeout(100);
                
                // Create async processing pipeline
                DataStream<String> asyncStream = env
                    .readTextFile(inputPath)
                    .map(new AsyncMapFunction<String, String>() {
                        @Override
                        public CompletableFuture<String> asyncMap(String input) throws Exception {
                            return CompletableFuture.supplyAsync(() -> {
                                // Simulate async operation (e.g., external API call)
                                try {
                                    Thread.sleep(100);
                                    return "processed_" + input;
                                } catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            });
                        }
                    });
                
                asyncStream.writeAsText(outputPath);
                
                // Execute and return result
                JobExecutionResult jobResult = env.execute("Async Procedure");
                return TableResult.OK(); // Convert to appropriate TableResult
                
            } catch (Exception e) {
                throw new RuntimeException("Async procedure execution failed", e);
            }
        });
    }
}

Integration with Table API

Procedure Registration and Execution

Register and execute procedures within the table environment.

// Register procedure for SQL usage
tableEnv.createTemporarySystemFunction("process_data", DataProcessingProcedure.class);

// Execute procedure via SQL
tableEnv.executeSql("CALL process_data('/input/data.txt', '/output/result.txt')");

// Programmatic procedure execution
ProcedureContext context = new DefaultProcedureContext();
DataProcessingProcedure procedure = new DataProcessingProcedure();
procedure.processData(context, "/input/data.txt", "/output/result.txt");

Procedure with Table Operations

Combine procedures with table operations for complex workflows.

public class TableProcedure {
    
    public TableResult processTable(
            ProcedureContext context,
            String sourceTable,
            String targetTable) throws Exception {
        
        StreamExecutionEnvironment env = context.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        // Configure table environment
        tableEnv.getConfig().getConfiguration().setString("parallelism.default", "4");
        
        // Create complex table processing pipeline
        Table sourceData = tableEnv.from(sourceTable);
        
        // Apply transformations
        Table processedData = sourceData
            .select($("user_id"), $("event_time"), $("event_data"))
            .where($("event_time").isGreater(lit("2023-01-01 00:00:00")))
            .groupBy($("user_id"))
            .select($("user_id"), $("event_data").count().as("event_count"));
        
        // Create statement set for efficient execution
        StreamStatementSet statementSet = tableEnv.createStatementSet();
        statementSet.addInsert(targetTable, processedData);
        
        return statementSet.execute();
    }
}

Error Handling in Procedures

Implement robust error handling for procedure execution.

public class RobustProcedure {
    
    public TableResult processWithErrorHandling(
            ProcedureContext context,
            String inputTable,
            String outputTable,
            String errorTable) {
        
        try {
            StreamExecutionEnvironment env = context.getExecutionEnvironment();
            
            // Configure error handling
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
            
            DataStream<Row> inputStream = createInputStream(env, inputTable);
            
            // Split stream for success and error handling
            SingleOutputStreamOperator<Row> processedStream = inputStream
                .map(new ProcessingMapFunction())
                .name("data-processing");
            
            // Main processing output
            processedStream.getSideOutput(ProcessingMapFunction.SUCCESS_TAG)
                .addSink(createTableSink(outputTable));
            
            // Error output
            processedStream.getSideOutput(ProcessingMapFunction.ERROR_TAG)
                .addSink(createTableSink(errorTable));
            
            env.execute("Robust Procedure Execution");
            return TableResult.OK();
            
        } catch (Exception e) {
            // Log error and return failure result
            System.err.println("Procedure execution failed: " + e.getMessage());
            return TableResult.OK(); // Return appropriate error result
        }
    }
    
    private static class ProcessingMapFunction extends ProcessFunction<Row, Row> {
        static final OutputTag<Row> SUCCESS_TAG = new OutputTag<Row>("success") {};
        static final OutputTag<Row> ERROR_TAG = new OutputTag<Row>("error") {};
        
        @Override
        public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {
            try {
                // Process row
                Row processedRow = processRow(row);
                ctx.output(SUCCESS_TAG, processedRow);
            } catch (Exception e) {
                // Send to error stream
                Row errorRow = Row.of(row.toString(), e.getMessage(), System.currentTimeMillis());
                ctx.output(ERROR_TAG, errorRow);
            }
        }
        
        private Row processRow(Row row) throws Exception {
            // Implementation of row processing logic
            return row;
        }
    }
    
    private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String table) {
        return null; // Implementation
    }
    
    private SinkFunction<Row> createTableSink(String table) {
        return null; // Implementation
    }
}

Testing Procedures

Unit Testing Procedures

Test procedures in isolation with mock contexts.

public class ProcedureTest {
    
    @Test
    public void testDataProcessingProcedure() throws Exception {
        // Create test execution environment
        StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.createLocalEnvironment();
        testEnv.setParallelism(1);
        
        // Create mock context
        ProcedureContext testContext = new TestProcedureContext(testEnv);
        
        // Create procedure instance
        DataProcessingProcedure procedure = new DataProcessingProcedure();
        
        // Execute procedure with test data
        String testInput = "test-input.txt";
        String testOutput = "test-output.txt";
        
        procedure.processData(testContext, testInput, testOutput);
        
        // Verify results
        // Add assertions based on expected output
    }
    
    private static class TestProcedureContext implements ProcedureContext {
        private final StreamExecutionEnvironment env;
        
        public TestProcedureContext(StreamExecutionEnvironment env) {
            this.env = env;
        }
        
        @Override
        public StreamExecutionEnvironment getExecutionEnvironment() {
            return env;
        }
    }
}

Types

Core Procedure Types

import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedure.DefaultProcedureContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Table Integration Types

import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.Table;

Execution and Configuration Types

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;

Async Processing Types

import java.util.concurrent.CompletableFuture;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge

docs

builtin-connectors.md

changelog-processing.md

datastream-connectors.md

index.md

procedures.md

statement-sets.md

stream-table-environment.md

watermark-strategies.md

tile.json