Java bridge for seamless integration between Flink's Table/SQL API and DataStream API, enabling conversion between streams and tables with unified batch and stream processing.
—
Context for stored procedure execution with access to StreamExecutionEnvironment. Procedures enable encapsulation of complex data processing logic that can be called from SQL and integrated with the streaming execution environment.
Core interface providing context for stored procedure execution.
/**
* A context to provide necessary context used by stored procedure
* Provides access to execution environment for procedure implementation
*/
public interface ProcedureContext {
/**
* Return the StreamExecutionEnvironment where the procedure is called
* Flink creates a new StreamExecutionEnvironment based on current configuration
* and passes it to the procedure for every procedure call
* The procedure can modify the passed StreamExecutionEnvironment safely
* as it won't be leaked outside
* @return StreamExecutionEnvironment for procedure execution
*/
StreamExecutionEnvironment getExecutionEnvironment();
}Default implementation of ProcedureContext for standard procedure execution.
/**
* Default implementation of ProcedureContext
* Provides standard procedure execution context
*/
public class DefaultProcedureContext implements ProcedureContext {
/**
* Get the StreamExecutionEnvironment for this procedure context
* @return StreamExecutionEnvironment instance configured for procedure execution
*/
@Override
public StreamExecutionEnvironment getExecutionEnvironment();
}Usage Examples:
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedure.DefaultProcedureContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
// Custom procedure implementation
public class DataProcessingProcedure {
public void processData(ProcedureContext context, String inputPath, String outputPath) {
// Get execution environment from context
StreamExecutionEnvironment env = context.getExecutionEnvironment();
// Procedure can safely modify the environment
env.setParallelism(4);
env.enableCheckpointing(60000); // 1-minute checkpoints
// Implement data processing logic
DataStream<String> inputStream = env.readTextFile(inputPath);
DataStream<String> processedStream = inputStream
.map(line -> line.toUpperCase())
.filter(line -> line.length() > 10);
// Write results
processedStream.writeAsText(outputPath);
// Environment execution will be handled by the procedure framework
}
}
// Using default context
ProcedureContext context = new DefaultProcedureContext();
DataProcessingProcedure procedure = new DataProcessingProcedure();
procedure.processData(context, "/input/data.txt", "/output/processed.txt");Implement procedures that maintain state across invocations.
public class StatefulAnalyticsProcedure {
private final ValueStateDescriptor<Long> counterDescriptor;
public StatefulAnalyticsProcedure() {
this.counterDescriptor = new ValueStateDescriptor<>("procedure-counter", Long.class);
}
public TableResult analyzeWithState(
ProcedureContext context,
String tableName,
String outputTable) throws Exception {
StreamExecutionEnvironment env = context.getExecutionEnvironment();
// Configure environment for stateful processing
env.enableCheckpointing(30000);
env.setStateBackend(new HashMapStateBackend());
// Create stateful processing stream
DataStream<Row> analysisStream = env
.fromSource(createTableSource(tableName), WatermarkStrategy.noWatermarks(), "analysis-source")
.keyBy(row -> row.getField(0))
.process(new KeyedProcessFunction<Object, Row, Row>() {
private ValueState<Long> counter;
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(counterDescriptor);
}
@Override
public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {
Long currentCount = counter.value();
if (currentCount == null) {
currentCount = 0L;
}
counter.update(currentCount + 1);
// Emit enriched row with count
Row enrichedRow = Row.of(row.getField(0), row.getField(1), currentCount + 1);
out.collect(enrichedRow);
}
});
// Sink to output table
analysisStream.addSink(createTableSink(outputTable));
return null; // Return appropriate TableResult
}
private SourceFunction<Row> createTableSource(String tableName) {
// Implement table source creation
return null;
}
private SinkFunction<Row> createTableSink(String tableName) {
// Implement table sink creation
return null;
}
}Create procedures that accept configuration parameters and customize execution.
public class ConfigurableProcedure {
public void processWithConfig(
ProcedureContext context,
Map<String, String> config,
String inputTable,
String outputTable) {
StreamExecutionEnvironment env = context.getExecutionEnvironment();
// Apply configuration from parameters
int parallelism = Integer.parseInt(config.getOrDefault("parallelism", "4"));
long checkpointInterval = Long.parseLong(config.getOrDefault("checkpoint.interval", "60000"));
String processingMode = config.getOrDefault("processing.mode", "event-time");
env.setParallelism(parallelism);
env.enableCheckpointing(checkpointInterval);
// Configure based on processing mode
if ("event-time".equals(processingMode)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
} else {
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
}
// Implement processing logic based on configuration
DataStream<Row> inputStream = createInputStream(env, inputTable);
DataStream<Row> processedStream;
if ("aggregation".equals(config.get("operation.type"))) {
processedStream = inputStream
.keyBy(row -> row.getField(0))
.timeWindow(Time.minutes(5))
.aggregate(new CustomAggregateFunction());
} else {
processedStream = inputStream
.map(new CustomMapFunction(config));
}
processedStream.addSink(createOutputStream(outputTable));
}
private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String tableName) {
// Implementation for creating input stream from table
return null;
}
private SinkFunction<Row> createOutputStream(String tableName) {
// Implementation for creating output sink to table
return null;
}
}Implement procedures that handle asynchronous operations.
public class AsyncProcedure {
public CompletableFuture<TableResult> processAsync(
ProcedureContext context,
String inputPath,
String outputPath) {
return CompletableFuture.supplyAsync(() -> {
try {
StreamExecutionEnvironment env = context.getExecutionEnvironment();
// Configure for async execution
env.setParallelism(8);
env.setBufferTimeout(100);
// Create async processing pipeline
DataStream<String> asyncStream = env
.readTextFile(inputPath)
.map(new AsyncMapFunction<String, String>() {
@Override
public CompletableFuture<String> asyncMap(String input) throws Exception {
return CompletableFuture.supplyAsync(() -> {
// Simulate async operation (e.g., external API call)
try {
Thread.sleep(100);
return "processed_" + input;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
});
asyncStream.writeAsText(outputPath);
// Execute and return result
JobExecutionResult jobResult = env.execute("Async Procedure");
return TableResult.OK(); // Convert to appropriate TableResult
} catch (Exception e) {
throw new RuntimeException("Async procedure execution failed", e);
}
});
}
}Register and execute procedures within the table environment.
// Register procedure for SQL usage
tableEnv.createTemporarySystemFunction("process_data", DataProcessingProcedure.class);
// Execute procedure via SQL
tableEnv.executeSql("CALL process_data('/input/data.txt', '/output/result.txt')");
// Programmatic procedure execution
ProcedureContext context = new DefaultProcedureContext();
DataProcessingProcedure procedure = new DataProcessingProcedure();
procedure.processData(context, "/input/data.txt", "/output/result.txt");Combine procedures with table operations for complex workflows.
public class TableProcedure {
public TableResult processTable(
ProcedureContext context,
String sourceTable,
String targetTable) throws Exception {
StreamExecutionEnvironment env = context.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Configure table environment
tableEnv.getConfig().getConfiguration().setString("parallelism.default", "4");
// Create complex table processing pipeline
Table sourceData = tableEnv.from(sourceTable);
// Apply transformations
Table processedData = sourceData
.select($("user_id"), $("event_time"), $("event_data"))
.where($("event_time").isGreater(lit("2023-01-01 00:00:00")))
.groupBy($("user_id"))
.select($("user_id"), $("event_data").count().as("event_count"));
// Create statement set for efficient execution
StreamStatementSet statementSet = tableEnv.createStatementSet();
statementSet.addInsert(targetTable, processedData);
return statementSet.execute();
}
}Implement robust error handling for procedure execution.
public class RobustProcedure {
public TableResult processWithErrorHandling(
ProcedureContext context,
String inputTable,
String outputTable,
String errorTable) {
try {
StreamExecutionEnvironment env = context.getExecutionEnvironment();
// Configure error handling
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
DataStream<Row> inputStream = createInputStream(env, inputTable);
// Split stream for success and error handling
SingleOutputStreamOperator<Row> processedStream = inputStream
.map(new ProcessingMapFunction())
.name("data-processing");
// Main processing output
processedStream.getSideOutput(ProcessingMapFunction.SUCCESS_TAG)
.addSink(createTableSink(outputTable));
// Error output
processedStream.getSideOutput(ProcessingMapFunction.ERROR_TAG)
.addSink(createTableSink(errorTable));
env.execute("Robust Procedure Execution");
return TableResult.OK();
} catch (Exception e) {
// Log error and return failure result
System.err.println("Procedure execution failed: " + e.getMessage());
return TableResult.OK(); // Return appropriate error result
}
}
private static class ProcessingMapFunction extends ProcessFunction<Row, Row> {
static final OutputTag<Row> SUCCESS_TAG = new OutputTag<Row>("success") {};
static final OutputTag<Row> ERROR_TAG = new OutputTag<Row>("error") {};
@Override
public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {
try {
// Process row
Row processedRow = processRow(row);
ctx.output(SUCCESS_TAG, processedRow);
} catch (Exception e) {
// Send to error stream
Row errorRow = Row.of(row.toString(), e.getMessage(), System.currentTimeMillis());
ctx.output(ERROR_TAG, errorRow);
}
}
private Row processRow(Row row) throws Exception {
// Implementation of row processing logic
return row;
}
}
private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String table) {
return null; // Implementation
}
private SinkFunction<Row> createTableSink(String table) {
return null; // Implementation
}
}Test procedures in isolation with mock contexts.
public class ProcedureTest {
@Test
public void testDataProcessingProcedure() throws Exception {
// Create test execution environment
StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.createLocalEnvironment();
testEnv.setParallelism(1);
// Create mock context
ProcedureContext testContext = new TestProcedureContext(testEnv);
// Create procedure instance
DataProcessingProcedure procedure = new DataProcessingProcedure();
// Execute procedure with test data
String testInput = "test-input.txt";
String testOutput = "test-output.txt";
procedure.processData(testContext, testInput, testOutput);
// Verify results
// Add assertions based on expected output
}
private static class TestProcedureContext implements ProcedureContext {
private final StreamExecutionEnvironment env;
public TestProcedureContext(StreamExecutionEnvironment env) {
this.env = env;
}
@Override
public StreamExecutionEnvironment getExecutionEnvironment() {
return env;
}
}
}import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedure.DefaultProcedureContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.Table;import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;import java.util.concurrent.CompletableFuture;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-api-java-bridge