0
# Procedures
1
2
Context for stored procedure execution with access to StreamExecutionEnvironment. Procedures enable encapsulation of complex data processing logic that can be called from SQL and integrated with the streaming execution environment.
3
4
## Capabilities
5
6
### Procedure Context Interface
7
8
Core interface providing context for stored procedure execution.
9
10
```java { .api }
11
/**
12
* A context to provide necessary context used by stored procedure
13
* Provides access to execution environment for procedure implementation
14
*/
15
public interface ProcedureContext {
16
17
/**
18
* Return the StreamExecutionEnvironment where the procedure is called
19
* Flink creates a new StreamExecutionEnvironment based on current configuration
20
* and passes it to the procedure for every procedure call
21
* The procedure can modify the passed StreamExecutionEnvironment safely
22
* as it won't be leaked outside
23
* @return StreamExecutionEnvironment for procedure execution
24
*/
25
StreamExecutionEnvironment getExecutionEnvironment();
26
}
27
```
28
29
### Default Procedure Context
30
31
Default implementation of ProcedureContext for standard procedure execution.
32
33
```java { .api }
34
/**
35
* Default implementation of ProcedureContext
36
* Provides standard procedure execution context
37
*/
38
public class DefaultProcedureContext implements ProcedureContext {
39
40
/**
41
* Get the StreamExecutionEnvironment for this procedure context
42
* @return StreamExecutionEnvironment instance configured for procedure execution
43
*/
44
@Override
45
public StreamExecutionEnvironment getExecutionEnvironment();
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
import org.apache.flink.table.procedure.ProcedureContext;
53
import org.apache.flink.table.procedure.DefaultProcedureContext;
54
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
55
import org.apache.flink.streaming.api.datastream.DataStream;
56
57
// Custom procedure implementation
58
public class DataProcessingProcedure {
59
60
public void processData(ProcedureContext context, String inputPath, String outputPath) {
61
// Get execution environment from context
62
StreamExecutionEnvironment env = context.getExecutionEnvironment();
63
64
// Procedure can safely modify the environment
65
env.setParallelism(4);
66
env.enableCheckpointing(60000); // 1-minute checkpoints
67
68
// Implement data processing logic
69
DataStream<String> inputStream = env.readTextFile(inputPath);
70
DataStream<String> processedStream = inputStream
71
.map(line -> line.toUpperCase())
72
.filter(line -> line.length() > 10);
73
74
// Write results
75
processedStream.writeAsText(outputPath);
76
77
// Environment execution will be handled by the procedure framework
78
}
79
}
80
81
// Using default context
82
ProcedureContext context = new DefaultProcedureContext();
83
DataProcessingProcedure procedure = new DataProcessingProcedure();
84
procedure.processData(context, "/input/data.txt", "/output/processed.txt");
85
```
86
87
## Advanced Procedure Patterns
88
89
### Stateful Procedure Implementation
90
91
Implement procedures that maintain state across invocations.
92
93
```java
94
public class StatefulAnalyticsProcedure {
95
private final ValueStateDescriptor<Long> counterDescriptor;
96
97
public StatefulAnalyticsProcedure() {
98
this.counterDescriptor = new ValueStateDescriptor<>("procedure-counter", Long.class);
99
}
100
101
public TableResult analyzeWithState(
102
ProcedureContext context,
103
String tableName,
104
String outputTable) throws Exception {
105
106
StreamExecutionEnvironment env = context.getExecutionEnvironment();
107
108
// Configure environment for stateful processing
109
env.enableCheckpointing(30000);
110
env.setStateBackend(new HashMapStateBackend());
111
112
// Create stateful processing stream
113
DataStream<Row> analysisStream = env
114
.fromSource(createTableSource(tableName), WatermarkStrategy.noWatermarks(), "analysis-source")
115
.keyBy(row -> row.getField(0))
116
.process(new KeyedProcessFunction<Object, Row, Row>() {
117
private ValueState<Long> counter;
118
119
@Override
120
public void open(Configuration parameters) throws Exception {
121
counter = getRuntimeContext().getState(counterDescriptor);
122
}
123
124
@Override
125
public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {
126
Long currentCount = counter.value();
127
if (currentCount == null) {
128
currentCount = 0L;
129
}
130
counter.update(currentCount + 1);
131
132
// Emit enriched row with count
133
Row enrichedRow = Row.of(row.getField(0), row.getField(1), currentCount + 1);
134
out.collect(enrichedRow);
135
}
136
});
137
138
// Sink to output table
139
analysisStream.addSink(createTableSink(outputTable));
140
141
return null; // Return appropriate TableResult
142
}
143
144
private SourceFunction<Row> createTableSource(String tableName) {
145
// Implement table source creation
146
return null;
147
}
148
149
private SinkFunction<Row> createTableSink(String tableName) {
150
// Implement table sink creation
151
return null;
152
}
153
}
154
```
155
156
### Procedure with Custom Configuration
157
158
Create procedures that accept configuration parameters and customize execution.
159
160
```java
161
public class ConfigurableProcedure {
162
163
public void processWithConfig(
164
ProcedureContext context,
165
Map<String, String> config,
166
String inputTable,
167
String outputTable) {
168
169
StreamExecutionEnvironment env = context.getExecutionEnvironment();
170
171
// Apply configuration from parameters
172
int parallelism = Integer.parseInt(config.getOrDefault("parallelism", "4"));
173
long checkpointInterval = Long.parseLong(config.getOrDefault("checkpoint.interval", "60000"));
174
String processingMode = config.getOrDefault("processing.mode", "event-time");
175
176
env.setParallelism(parallelism);
177
env.enableCheckpointing(checkpointInterval);
178
179
// Configure based on processing mode
180
if ("event-time".equals(processingMode)) {
181
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
182
} else {
183
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
184
}
185
186
// Implement processing logic based on configuration
187
DataStream<Row> inputStream = createInputStream(env, inputTable);
188
DataStream<Row> processedStream;
189
190
if ("aggregation".equals(config.get("operation.type"))) {
191
processedStream = inputStream
192
.keyBy(row -> row.getField(0))
193
.timeWindow(Time.minutes(5))
194
.aggregate(new CustomAggregateFunction());
195
} else {
196
processedStream = inputStream
197
.map(new CustomMapFunction(config));
198
}
199
200
processedStream.addSink(createOutputStream(outputTable));
201
}
202
203
private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String tableName) {
204
// Implementation for creating input stream from table
205
return null;
206
}
207
208
private SinkFunction<Row> createOutputStream(String tableName) {
209
// Implementation for creating output sink to table
210
return null;
211
}
212
}
213
```
214
215
### Async Procedure Execution
216
217
Implement procedures that handle asynchronous operations.
218
219
```java
220
public class AsyncProcedure {
221
222
public CompletableFuture<TableResult> processAsync(
223
ProcedureContext context,
224
String inputPath,
225
String outputPath) {
226
227
return CompletableFuture.supplyAsync(() -> {
228
try {
229
StreamExecutionEnvironment env = context.getExecutionEnvironment();
230
231
// Configure for async execution
232
env.setParallelism(8);
233
env.setBufferTimeout(100);
234
235
// Create async processing pipeline
236
DataStream<String> asyncStream = env
237
.readTextFile(inputPath)
238
.map(new AsyncMapFunction<String, String>() {
239
@Override
240
public CompletableFuture<String> asyncMap(String input) throws Exception {
241
return CompletableFuture.supplyAsync(() -> {
242
// Simulate async operation (e.g., external API call)
243
try {
244
Thread.sleep(100);
245
return "processed_" + input;
246
} catch (InterruptedException e) {
247
throw new RuntimeException(e);
248
}
249
});
250
}
251
});
252
253
asyncStream.writeAsText(outputPath);
254
255
// Execute and return result
256
JobExecutionResult jobResult = env.execute("Async Procedure");
257
return TableResult.OK(); // Convert to appropriate TableResult
258
259
} catch (Exception e) {
260
throw new RuntimeException("Async procedure execution failed", e);
261
}
262
});
263
}
264
}
265
```
266
267
## Integration with Table API
268
269
### Procedure Registration and Execution
270
271
Register and execute procedures within the table environment.
272
273
```java
274
// Register procedure for SQL usage
275
tableEnv.createTemporarySystemFunction("process_data", DataProcessingProcedure.class);
276
277
// Execute procedure via SQL
278
tableEnv.executeSql("CALL process_data('/input/data.txt', '/output/result.txt')");
279
280
// Programmatic procedure execution
281
ProcedureContext context = new DefaultProcedureContext();
282
DataProcessingProcedure procedure = new DataProcessingProcedure();
283
procedure.processData(context, "/input/data.txt", "/output/result.txt");
284
```
285
286
### Procedure with Table Operations
287
288
Combine procedures with table operations for complex workflows.
289
290
```java
291
public class TableProcedure {
292
293
public TableResult processTable(
294
ProcedureContext context,
295
String sourceTable,
296
String targetTable) throws Exception {
297
298
StreamExecutionEnvironment env = context.getExecutionEnvironment();
299
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
300
301
// Configure table environment
302
tableEnv.getConfig().getConfiguration().setString("parallelism.default", "4");
303
304
// Create complex table processing pipeline
305
Table sourceData = tableEnv.from(sourceTable);
306
307
// Apply transformations
308
Table processedData = sourceData
309
.select($("user_id"), $("event_time"), $("event_data"))
310
.where($("event_time").isGreater(lit("2023-01-01 00:00:00")))
311
.groupBy($("user_id"))
312
.select($("user_id"), $("event_data").count().as("event_count"));
313
314
// Create statement set for efficient execution
315
StreamStatementSet statementSet = tableEnv.createStatementSet();
316
statementSet.addInsert(targetTable, processedData);
317
318
return statementSet.execute();
319
}
320
}
321
```
322
323
### Error Handling in Procedures
324
325
Implement robust error handling for procedure execution.
326
327
```java
328
public class RobustProcedure {
329
330
public TableResult processWithErrorHandling(
331
ProcedureContext context,
332
String inputTable,
333
String outputTable,
334
String errorTable) {
335
336
try {
337
StreamExecutionEnvironment env = context.getExecutionEnvironment();
338
339
// Configure error handling
340
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
341
342
DataStream<Row> inputStream = createInputStream(env, inputTable);
343
344
// Split stream for success and error handling
345
SingleOutputStreamOperator<Row> processedStream = inputStream
346
.map(new ProcessingMapFunction())
347
.name("data-processing");
348
349
// Main processing output
350
processedStream.getSideOutput(ProcessingMapFunction.SUCCESS_TAG)
351
.addSink(createTableSink(outputTable));
352
353
// Error output
354
processedStream.getSideOutput(ProcessingMapFunction.ERROR_TAG)
355
.addSink(createTableSink(errorTable));
356
357
env.execute("Robust Procedure Execution");
358
return TableResult.OK();
359
360
} catch (Exception e) {
361
// Log error and return failure result
362
System.err.println("Procedure execution failed: " + e.getMessage());
363
return TableResult.OK(); // Return appropriate error result
364
}
365
}
366
367
private static class ProcessingMapFunction extends ProcessFunction<Row, Row> {
368
static final OutputTag<Row> SUCCESS_TAG = new OutputTag<Row>("success") {};
369
static final OutputTag<Row> ERROR_TAG = new OutputTag<Row>("error") {};
370
371
@Override
372
public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {
373
try {
374
// Process row
375
Row processedRow = processRow(row);
376
ctx.output(SUCCESS_TAG, processedRow);
377
} catch (Exception e) {
378
// Send to error stream
379
Row errorRow = Row.of(row.toString(), e.getMessage(), System.currentTimeMillis());
380
ctx.output(ERROR_TAG, errorRow);
381
}
382
}
383
384
private Row processRow(Row row) throws Exception {
385
// Implementation of row processing logic
386
return row;
387
}
388
}
389
390
private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String table) {
391
return null; // Implementation
392
}
393
394
private SinkFunction<Row> createTableSink(String table) {
395
return null; // Implementation
396
}
397
}
398
```
399
400
## Testing Procedures
401
402
### Unit Testing Procedures
403
404
Test procedures in isolation with mock contexts.
405
406
```java
407
public class ProcedureTest {
408
409
@Test
410
public void testDataProcessingProcedure() throws Exception {
411
// Create test execution environment
412
StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.createLocalEnvironment();
413
testEnv.setParallelism(1);
414
415
// Create mock context
416
ProcedureContext testContext = new TestProcedureContext(testEnv);
417
418
// Create procedure instance
419
DataProcessingProcedure procedure = new DataProcessingProcedure();
420
421
// Execute procedure with test data
422
String testInput = "test-input.txt";
423
String testOutput = "test-output.txt";
424
425
procedure.processData(testContext, testInput, testOutput);
426
427
// Verify results
428
// Add assertions based on expected output
429
}
430
431
private static class TestProcedureContext implements ProcedureContext {
432
private final StreamExecutionEnvironment env;
433
434
public TestProcedureContext(StreamExecutionEnvironment env) {
435
this.env = env;
436
}
437
438
@Override
439
public StreamExecutionEnvironment getExecutionEnvironment() {
440
return env;
441
}
442
}
443
}
444
```
445
446
## Types
447
448
### Core Procedure Types
449
450
```java { .api }
451
import org.apache.flink.table.procedure.ProcedureContext;
452
import org.apache.flink.table.procedure.DefaultProcedureContext;
453
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
454
```
455
456
### Table Integration Types
457
458
```java { .api }
459
import org.apache.flink.table.api.TableResult;
460
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
461
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
462
import org.apache.flink.table.api.Table;
463
```
464
465
### Execution and Configuration Types
466
467
```java { .api }
468
import org.apache.flink.api.common.JobExecutionResult;
469
import org.apache.flink.configuration.Configuration;
470
import org.apache.flink.streaming.api.CheckpointingMode;
471
import org.apache.flink.streaming.api.TimeCharacteristic;
472
```
473
474
### Async Processing Types
475
476
```java { .api }
477
import java.util.concurrent.CompletableFuture;
478
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
479
import org.apache.flink.streaming.api.functions.async.ResultFuture;
480
```