0
# Procedure Context
1
2
Framework for stored procedure execution with access to StreamExecutionEnvironment. Provides context and environment access for implementing custom stored procedures in Flink's table ecosystem.
3
4
## Capabilities
5
6
### Procedure Context Interface
7
8
Core interface providing execution environment access for stored procedures.
9
10
```java { .api }
11
/**
12
* Context to provide necessary services for stored procedure execution
13
*/
14
public interface ProcedureContext {
15
16
/**
17
* Returns the StreamExecutionEnvironment where the procedure is called.
18
* Flink creates a new StreamExecutionEnvironment based on current configuration
19
* and passes it to the procedure for every procedure call. The procedure can
20
* modify the passed StreamExecutionEnvironment safely as it won't be leaked outside.
21
*
22
* @return StreamExecutionEnvironment for the procedure execution
23
*/
24
StreamExecutionEnvironment getExecutionEnvironment();
25
}
26
```
27
28
### Default Implementation
29
30
Default implementation of ProcedureContext for standard use cases.
31
32
```java { .api }
33
/**
34
* Default implementation of ProcedureContext
35
*/
36
public class DefaultProcedureContext implements ProcedureContext {
37
38
/**
39
* Creates a default procedure context with the specified execution environment
40
* @param executionEnvironment The StreamExecutionEnvironment to provide to procedures
41
*/
42
public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment);
43
44
/**
45
* Returns the configured StreamExecutionEnvironment
46
* @return StreamExecutionEnvironment for procedure execution
47
*/
48
@Override
49
public StreamExecutionEnvironment getExecutionEnvironment();
50
}
51
```
52
53
## Usage Examples
54
55
### Basic Procedure Implementation
56
57
```java
58
import org.apache.flink.table.procedure.ProcedureContext;
59
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
60
import org.apache.flink.streaming.api.datastream.DataStream;
61
import org.apache.flink.table.functions.TableFunction;
62
import org.apache.flink.api.java.utils.ParameterTool;
63
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
64
import org.apache.flink.core.execution.JobListener;
65
import org.apache.flink.api.common.JobExecutionResult;
66
import java.util.Collections;
67
import java.util.Map;
68
69
/**
70
* Example stored procedure that processes data using DataStream API
71
*/
72
public class DataProcessingProcedure {
73
74
public void processData(ProcedureContext context, String inputPath, String outputPath) {
75
// Get execution environment from context
76
StreamExecutionEnvironment env = context.getExecutionEnvironment();
77
78
// Configure environment settings
79
env.setParallelism(4);
80
env.enableCheckpointing(60000);
81
82
// Create DataStream processing pipeline
83
DataStream<String> inputStream = env.readTextFile(inputPath);
84
85
DataStream<String> processedStream = inputStream
86
.filter(line -> !line.isEmpty())
87
.map(String::toUpperCase)
88
.filter(line -> line.contains("IMPORTANT"));
89
90
// Write results
91
processedStream.writeAsText(outputPath);
92
93
// Execute the job
94
try {
95
env.execute("Data Processing Procedure");
96
} catch (Exception e) {
97
throw new RuntimeException("Procedure execution failed", e);
98
}
99
}
100
}
101
```
102
103
### Advanced Procedure with Table Integration
104
105
```java
106
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
107
import org.apache.flink.table.api.Table;
108
import org.apache.flink.types.Row;
109
110
/**
111
* Advanced procedure combining DataStream and Table APIs
112
*/
113
public class AdvancedAnalyticsProcedure {
114
115
public void runAnalytics(ProcedureContext context, String configPath) {
116
StreamExecutionEnvironment env = context.getExecutionEnvironment();
117
118
// Create table environment using the procedure's execution environment
119
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
120
121
// Configure sources based on config
122
tableEnv.executeSql(
123
"CREATE TABLE source_data (" +
124
" id BIGINT," +
125
" name STRING," +
126
" amount DECIMAL(10,2)," +
127
" event_time TIMESTAMP_LTZ(3)" +
128
") WITH (" +
129
" 'connector' = 'kafka'," +
130
" 'topic' = 'input-topic'," +
131
" 'properties.bootstrap.servers' = 'localhost:9092'" +
132
")"
133
);
134
135
// Perform table operations
136
Table analyticsResult = tableEnv.sqlQuery(
137
"SELECT " +
138
" name," +
139
" COUNT(*) as event_count," +
140
" SUM(amount) as total_amount," +
141
" AVG(amount) as avg_amount " +
142
"FROM source_data " +
143
"WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
144
"GROUP BY name"
145
);
146
147
// Convert to DataStream for custom processing
148
DataStream<Row> resultStream = tableEnv.toDataStream(analyticsResult);
149
150
// Apply custom DataStream operations
151
resultStream
152
.filter(row -> (Double) row.getField(3) > 100.0) // avg_amount > 100
153
.map(row -> String.format("High-value customer: %s (avg: %.2f)",
154
row.getField(0), row.getField(3)))
155
.print("alerts");
156
157
// Execute the complete pipeline
158
try {
159
env.execute("Advanced Analytics Procedure");
160
} catch (Exception e) {
161
throw new RuntimeException("Analytics procedure failed", e);
162
}
163
}
164
}
165
```
166
167
### Procedure Context Factory
168
169
```java
170
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
171
import org.apache.flink.table.procedure.DefaultProcedureContext;
172
173
/**
174
* Factory for creating procedure contexts with custom configurations
175
*/
176
public class ProcedureContextFactory {
177
178
public static ProcedureContext createContext() {
179
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
180
181
// Configure environment for procedure execution
182
env.setParallelism(2);
183
env.enableCheckpointing(30000);
184
env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(
185
Collections.singletonMap("procedure.mode", "batch")
186
));
187
188
return new DefaultProcedureContext(env);
189
}
190
191
public static ProcedureContext createStreamingContext(int parallelism) {
192
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
193
194
// Streaming-optimized configuration
195
env.setParallelism(parallelism);
196
env.enableCheckpointing(10000);
197
env.setBufferTimeout(100);
198
199
return new DefaultProcedureContext(env);
200
}
201
202
public static ProcedureContext createBatchContext() {
203
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
204
205
// Batch-optimized configuration
206
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
207
env.setParallelism(-1); // Max parallelism
208
209
return new DefaultProcedureContext(env);
210
}
211
}
212
```
213
214
### Procedure Registration and Execution
215
216
```java
217
// Register procedure in table environment
218
tableEnv.createTemporarySystemFunction("process_data", DataProcessingProcedure.class);
219
220
// Call procedure in SQL
221
tableEnv.executeSql("CALL process_data('/input/path', '/output/path')");
222
223
// Call procedure programmatically
224
ProcedureContext context = new DefaultProcedureContext(env);
225
DataProcessingProcedure procedure = new DataProcessingProcedure();
226
procedure.processData(context, "/input/path", "/output/path");
227
```
228
229
## Integration Patterns
230
231
### Procedure with Configuration
232
233
```java
234
import org.apache.flink.configuration.Configuration;
235
236
public class ConfigurableProcedure {
237
238
public void execute(ProcedureContext context, String configJson) {
239
StreamExecutionEnvironment env = context.getExecutionEnvironment();
240
241
// Parse configuration
242
ObjectMapper mapper = new ObjectMapper();
243
try {
244
ProcedureConfig config = mapper.readValue(configJson, ProcedureConfig.class);
245
246
// Apply configuration to environment
247
env.setParallelism(config.getParallelism());
248
env.enableCheckpointing(config.getCheckpointInterval());
249
250
if (config.getJobParameters() != null) {
251
env.getConfig().setGlobalJobParameters(
252
ParameterTool.fromMap(config.getJobParameters())
253
);
254
}
255
256
// Execute procedure logic with configuration
257
executeWithConfig(env, config);
258
259
} catch (Exception e) {
260
throw new RuntimeException("Configuration parsing failed", e);
261
}
262
}
263
264
private void executeWithConfig(StreamExecutionEnvironment env, ProcedureConfig config) {
265
// Implementation specific to configuration
266
}
267
268
public static class ProcedureConfig {
269
private int parallelism = 1;
270
private long checkpointInterval = 60000;
271
private Map<String, String> jobParameters;
272
273
// Getters and setters
274
}
275
}
276
```
277
278
### Error Handling and Monitoring
279
280
```java
281
public class MonitoredProcedure {
282
283
private static final Logger logger = LoggerFactory.getLogger(MonitoredProcedure.class);
284
285
public void executeWithMonitoring(ProcedureContext context, String jobName) {
286
StreamExecutionEnvironment env = context.getExecutionEnvironment();
287
288
try {
289
// Add monitoring and metrics
290
env.getConfig().setGlobalJobParameters(
291
ParameterTool.fromMap(Collections.singletonMap("job.name", jobName))
292
);
293
294
// Configure restart strategy
295
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
296
297
// Add execution monitoring
298
env.registerJobListener(new JobListener() {
299
@Override
300
public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
301
logger.info("Procedure job submitted: {}", jobName);
302
}
303
304
@Override
305
public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
306
if (throwable != null) {
307
logger.error("Procedure job failed: {}", jobName, throwable);
308
} else {
309
logger.info("Procedure job completed: {} in {}ms",
310
jobName, jobExecutionResult.getNetRuntime());
311
}
312
}
313
});
314
315
// Execute procedure logic
316
executeProcedureLogic(env);
317
318
JobExecutionResult result = env.execute(jobName);
319
logger.info("Procedure execution completed successfully");
320
321
} catch (Exception e) {
322
logger.error("Procedure execution failed: {}", jobName, e);
323
throw new RuntimeException("Procedure failed: " + jobName, e);
324
}
325
}
326
327
private void executeProcedureLogic(StreamExecutionEnvironment env) {
328
// Actual procedure implementation
329
}
330
}
331
```
332
333
## Type Definitions
334
335
### Context Lifecycle
336
337
```java { .api }
338
// Procedure context lifecycle
339
// 1. Context creation with StreamExecutionEnvironment
340
ProcedureContext context = new DefaultProcedureContext(env);
341
342
// 2. Context passed to procedure method
343
procedure.execute(context, parameters);
344
345
// 3. Procedure accesses environment via context
346
StreamExecutionEnvironment procEnv = context.getExecutionEnvironment();
347
348
// 4. Procedure modifies and uses environment safely
349
procEnv.setParallelism(4);
350
procEnv.execute("procedure-job");
351
```
352
353
### Environment Isolation
354
355
```java { .api }
356
/**
357
* Environment isolation guarantees:
358
* - Each procedure call gets a fresh StreamExecutionEnvironment
359
* - Modifications to the environment don't affect other procedures
360
* - Environment configuration is scoped to the procedure execution
361
* - Resource cleanup is handled automatically after procedure completion
362
*/
363
```
364
365
### Integration with Table Functions
366
367
```java { .api }
368
// Procedures can be combined with table functions for complex workflows
369
@FunctionHint(output = @DataTypeHint("ROW<result STRING, status STRING>"))
370
public class ProcedureTableFunction extends TableFunction<Row> {
371
372
public void eval(String input) {
373
ProcedureContext context = getCurrentContext();
374
375
// Execute procedure and collect results
376
try {
377
SomeProcedure procedure = new SomeProcedure();
378
procedure.execute(context, input);
379
collect(Row.of(input, "SUCCESS"));
380
} catch (Exception e) {
381
collect(Row.of(input, "FAILED: " + e.getMessage()));
382
}
383
}
384
}
385
```
386
387
## Best Practices
388
389
### Resource Management
390
391
```java
392
// Good: Proper resource cleanup
393
public void execute(ProcedureContext context) {
394
StreamExecutionEnvironment env = context.getExecutionEnvironment();
395
396
try {
397
// Configure environment
398
env.setParallelism(4);
399
400
// Execute logic
401
DataStream<String> stream = createDataStream(env);
402
stream.addSink(createSink());
403
404
env.execute("procedure-job");
405
406
} finally {
407
// Cleanup resources if needed
408
// Note: Environment cleanup is handled by Flink
409
}
410
}
411
```
412
413
### Configuration Management
414
415
```java
416
// Good: Externalized configuration
417
public void execute(ProcedureContext context, String configPath) {
418
StreamExecutionEnvironment env = context.getExecutionEnvironment();
419
420
// Load configuration from external source
421
Configuration config = Configuration.fromFile(configPath);
422
423
// Apply configuration
424
env.configure(config);
425
426
// Execute with configuration
427
executeWithConfig(env, config);
428
}
429
430
// Avoid: Hardcoded configuration
431
public void execute(ProcedureContext context) {
432
StreamExecutionEnvironment env = context.getExecutionEnvironment();
433
env.setParallelism(4); // Hardcoded - not flexible
434
}
435
```
436
437
### Error Handling
438
439
```java
440
// Good: Comprehensive error handling
441
public void execute(ProcedureContext context, String input) {
442
try {
443
validateInput(input);
444
445
StreamExecutionEnvironment env = context.getExecutionEnvironment();
446
setupEnvironment(env);
447
448
JobExecutionResult result = env.execute("procedure-job");
449
logSuccess(result);
450
451
} catch (ValidationException e) {
452
throw new IllegalArgumentException("Invalid procedure input", e);
453
} catch (Exception e) {
454
throw new RuntimeException("Procedure execution failed", e);
455
}
456
}
457
```