or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

procedures.mddocs/

0

# Procedures

1

2

Context for stored procedure execution with access to StreamExecutionEnvironment. Procedures enable encapsulation of complex data processing logic that can be called from SQL and integrated with the streaming execution environment.

3

4

## Capabilities

5

6

### Procedure Context Interface

7

8

Core interface providing context for stored procedure execution.

9

10

```java { .api }

11

/**

12

* A context to provide necessary context used by stored procedure

13

* Provides access to execution environment for procedure implementation

14

*/

15

public interface ProcedureContext {

16

17

/**

18

* Return the StreamExecutionEnvironment where the procedure is called

19

* Flink creates a new StreamExecutionEnvironment based on current configuration

20

* and passes it to the procedure for every procedure call

21

* The procedure can modify the passed StreamExecutionEnvironment safely

22

* as it won't be leaked outside

23

* @return StreamExecutionEnvironment for procedure execution

24

*/

25

StreamExecutionEnvironment getExecutionEnvironment();

26

}

27

```

28

29

### Default Procedure Context

30

31

Default implementation of ProcedureContext for standard procedure execution.

32

33

```java { .api }

34

/**

35

* Default implementation of ProcedureContext

36

* Provides standard procedure execution context

37

*/

38

public class DefaultProcedureContext implements ProcedureContext {

39

40

/**

41

* Get the StreamExecutionEnvironment for this procedure context

42

* @return StreamExecutionEnvironment instance configured for procedure execution

43

*/

44

@Override

45

public StreamExecutionEnvironment getExecutionEnvironment();

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

import org.apache.flink.table.procedure.ProcedureContext;

53

import org.apache.flink.table.procedure.DefaultProcedureContext;

54

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

55

import org.apache.flink.streaming.api.datastream.DataStream;

56

57

// Custom procedure implementation

58

public class DataProcessingProcedure {

59

60

public void processData(ProcedureContext context, String inputPath, String outputPath) {

61

// Get execution environment from context

62

StreamExecutionEnvironment env = context.getExecutionEnvironment();

63

64

// Procedure can safely modify the environment

65

env.setParallelism(4);

66

env.enableCheckpointing(60000); // 1-minute checkpoints

67

68

// Implement data processing logic

69

DataStream<String> inputStream = env.readTextFile(inputPath);

70

DataStream<String> processedStream = inputStream

71

.map(line -> line.toUpperCase())

72

.filter(line -> line.length() > 10);

73

74

// Write results

75

processedStream.writeAsText(outputPath);

76

77

// Environment execution will be handled by the procedure framework

78

}

79

}

80

81

// Using default context

82

ProcedureContext context = new DefaultProcedureContext();

83

DataProcessingProcedure procedure = new DataProcessingProcedure();

84

procedure.processData(context, "/input/data.txt", "/output/processed.txt");

85

```

86

87

## Advanced Procedure Patterns

88

89

### Stateful Procedure Implementation

90

91

Implement procedures that maintain state across invocations.

92

93

```java

94

public class StatefulAnalyticsProcedure {

95

private final ValueStateDescriptor<Long> counterDescriptor;

96

97

public StatefulAnalyticsProcedure() {

98

this.counterDescriptor = new ValueStateDescriptor<>("procedure-counter", Long.class);

99

}

100

101

public TableResult analyzeWithState(

102

ProcedureContext context,

103

String tableName,

104

String outputTable) throws Exception {

105

106

StreamExecutionEnvironment env = context.getExecutionEnvironment();

107

108

// Configure environment for stateful processing

109

env.enableCheckpointing(30000);

110

env.setStateBackend(new HashMapStateBackend());

111

112

// Create stateful processing stream

113

DataStream<Row> analysisStream = env

114

.fromSource(createTableSource(tableName), WatermarkStrategy.noWatermarks(), "analysis-source")

115

.keyBy(row -> row.getField(0))

116

.process(new KeyedProcessFunction<Object, Row, Row>() {

117

private ValueState<Long> counter;

118

119

@Override

120

public void open(Configuration parameters) throws Exception {

121

counter = getRuntimeContext().getState(counterDescriptor);

122

}

123

124

@Override

125

public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {

126

Long currentCount = counter.value();

127

if (currentCount == null) {

128

currentCount = 0L;

129

}

130

counter.update(currentCount + 1);

131

132

// Emit enriched row with count

133

Row enrichedRow = Row.of(row.getField(0), row.getField(1), currentCount + 1);

134

out.collect(enrichedRow);

135

}

136

});

137

138

// Sink to output table

139

analysisStream.addSink(createTableSink(outputTable));

140

141

return null; // Return appropriate TableResult

142

}

143

144

private SourceFunction<Row> createTableSource(String tableName) {

145

// Implement table source creation

146

return null;

147

}

148

149

private SinkFunction<Row> createTableSink(String tableName) {

150

// Implement table sink creation

151

return null;

152

}

153

}

154

```

155

156

### Procedure with Custom Configuration

157

158

Create procedures that accept configuration parameters and customize execution.

159

160

```java

161

public class ConfigurableProcedure {

162

163

public void processWithConfig(

164

ProcedureContext context,

165

Map<String, String> config,

166

String inputTable,

167

String outputTable) {

168

169

StreamExecutionEnvironment env = context.getExecutionEnvironment();

170

171

// Apply configuration from parameters

172

int parallelism = Integer.parseInt(config.getOrDefault("parallelism", "4"));

173

long checkpointInterval = Long.parseLong(config.getOrDefault("checkpoint.interval", "60000"));

174

String processingMode = config.getOrDefault("processing.mode", "event-time");

175

176

env.setParallelism(parallelism);

177

env.enableCheckpointing(checkpointInterval);

178

179

// Configure based on processing mode

180

if ("event-time".equals(processingMode)) {

181

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

182

} else {

183

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

184

}

185

186

// Implement processing logic based on configuration

187

DataStream<Row> inputStream = createInputStream(env, inputTable);

188

DataStream<Row> processedStream;

189

190

if ("aggregation".equals(config.get("operation.type"))) {

191

processedStream = inputStream

192

.keyBy(row -> row.getField(0))

193

.timeWindow(Time.minutes(5))

194

.aggregate(new CustomAggregateFunction());

195

} else {

196

processedStream = inputStream

197

.map(new CustomMapFunction(config));

198

}

199

200

processedStream.addSink(createOutputStream(outputTable));

201

}

202

203

private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String tableName) {

204

// Implementation for creating input stream from table

205

return null;

206

}

207

208

private SinkFunction<Row> createOutputStream(String tableName) {

209

// Implementation for creating output sink to table

210

return null;

211

}

212

}

213

```

214

215

### Async Procedure Execution

216

217

Implement procedures that handle asynchronous operations.

218

219

```java

220

public class AsyncProcedure {

221

222

public CompletableFuture<TableResult> processAsync(

223

ProcedureContext context,

224

String inputPath,

225

String outputPath) {

226

227

return CompletableFuture.supplyAsync(() -> {

228

try {

229

StreamExecutionEnvironment env = context.getExecutionEnvironment();

230

231

// Configure for async execution

232

env.setParallelism(8);

233

env.setBufferTimeout(100);

234

235

// Create async processing pipeline

236

DataStream<String> asyncStream = env

237

.readTextFile(inputPath)

238

.map(new AsyncMapFunction<String, String>() {

239

@Override

240

public CompletableFuture<String> asyncMap(String input) throws Exception {

241

return CompletableFuture.supplyAsync(() -> {

242

// Simulate async operation (e.g., external API call)

243

try {

244

Thread.sleep(100);

245

return "processed_" + input;

246

} catch (InterruptedException e) {

247

throw new RuntimeException(e);

248

}

249

});

250

}

251

});

252

253

asyncStream.writeAsText(outputPath);

254

255

// Execute and return result

256

JobExecutionResult jobResult = env.execute("Async Procedure");

257

return TableResult.OK(); // Convert to appropriate TableResult

258

259

} catch (Exception e) {

260

throw new RuntimeException("Async procedure execution failed", e);

261

}

262

});

263

}

264

}

265

```

266

267

## Integration with Table API

268

269

### Procedure Registration and Execution

270

271

Register and execute procedures within the table environment.

272

273

```java

274

// Register procedure for SQL usage

275

tableEnv.createTemporarySystemFunction("process_data", DataProcessingProcedure.class);

276

277

// Execute procedure via SQL

278

tableEnv.executeSql("CALL process_data('/input/data.txt', '/output/result.txt')");

279

280

// Programmatic procedure execution

281

ProcedureContext context = new DefaultProcedureContext();

282

DataProcessingProcedure procedure = new DataProcessingProcedure();

283

procedure.processData(context, "/input/data.txt", "/output/result.txt");

284

```

285

286

### Procedure with Table Operations

287

288

Combine procedures with table operations for complex workflows.

289

290

```java

291

public class TableProcedure {

292

293

public TableResult processTable(

294

ProcedureContext context,

295

String sourceTable,

296

String targetTable) throws Exception {

297

298

StreamExecutionEnvironment env = context.getExecutionEnvironment();

299

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

300

301

// Configure table environment

302

tableEnv.getConfig().getConfiguration().setString("parallelism.default", "4");

303

304

// Create complex table processing pipeline

305

Table sourceData = tableEnv.from(sourceTable);

306

307

// Apply transformations

308

Table processedData = sourceData

309

.select($("user_id"), $("event_time"), $("event_data"))

310

.where($("event_time").isGreater(lit("2023-01-01 00:00:00")))

311

.groupBy($("user_id"))

312

.select($("user_id"), $("event_data").count().as("event_count"));

313

314

// Create statement set for efficient execution

315

StreamStatementSet statementSet = tableEnv.createStatementSet();

316

statementSet.addInsert(targetTable, processedData);

317

318

return statementSet.execute();

319

}

320

}

321

```

322

323

### Error Handling in Procedures

324

325

Implement robust error handling for procedure execution.

326

327

```java

328

public class RobustProcedure {

329

330

public TableResult processWithErrorHandling(

331

ProcedureContext context,

332

String inputTable,

333

String outputTable,

334

String errorTable) {

335

336

try {

337

StreamExecutionEnvironment env = context.getExecutionEnvironment();

338

339

// Configure error handling

340

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));

341

342

DataStream<Row> inputStream = createInputStream(env, inputTable);

343

344

// Split stream for success and error handling

345

SingleOutputStreamOperator<Row> processedStream = inputStream

346

.map(new ProcessingMapFunction())

347

.name("data-processing");

348

349

// Main processing output

350

processedStream.getSideOutput(ProcessingMapFunction.SUCCESS_TAG)

351

.addSink(createTableSink(outputTable));

352

353

// Error output

354

processedStream.getSideOutput(ProcessingMapFunction.ERROR_TAG)

355

.addSink(createTableSink(errorTable));

356

357

env.execute("Robust Procedure Execution");

358

return TableResult.OK();

359

360

} catch (Exception e) {

361

// Log error and return failure result

362

System.err.println("Procedure execution failed: " + e.getMessage());

363

return TableResult.OK(); // Return appropriate error result

364

}

365

}

366

367

private static class ProcessingMapFunction extends ProcessFunction<Row, Row> {

368

static final OutputTag<Row> SUCCESS_TAG = new OutputTag<Row>("success") {};

369

static final OutputTag<Row> ERROR_TAG = new OutputTag<Row>("error") {};

370

371

@Override

372

public void processElement(Row row, Context ctx, Collector<Row> out) throws Exception {

373

try {

374

// Process row

375

Row processedRow = processRow(row);

376

ctx.output(SUCCESS_TAG, processedRow);

377

} catch (Exception e) {

378

// Send to error stream

379

Row errorRow = Row.of(row.toString(), e.getMessage(), System.currentTimeMillis());

380

ctx.output(ERROR_TAG, errorRow);

381

}

382

}

383

384

private Row processRow(Row row) throws Exception {

385

// Implementation of row processing logic

386

return row;

387

}

388

}

389

390

private DataStream<Row> createInputStream(StreamExecutionEnvironment env, String table) {

391

return null; // Implementation

392

}

393

394

private SinkFunction<Row> createTableSink(String table) {

395

return null; // Implementation

396

}

397

}

398

```

399

400

## Testing Procedures

401

402

### Unit Testing Procedures

403

404

Test procedures in isolation with mock contexts.

405

406

```java

407

public class ProcedureTest {

408

409

@Test

410

public void testDataProcessingProcedure() throws Exception {

411

// Create test execution environment

412

StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.createLocalEnvironment();

413

testEnv.setParallelism(1);

414

415

// Create mock context

416

ProcedureContext testContext = new TestProcedureContext(testEnv);

417

418

// Create procedure instance

419

DataProcessingProcedure procedure = new DataProcessingProcedure();

420

421

// Execute procedure with test data

422

String testInput = "test-input.txt";

423

String testOutput = "test-output.txt";

424

425

procedure.processData(testContext, testInput, testOutput);

426

427

// Verify results

428

// Add assertions based on expected output

429

}

430

431

private static class TestProcedureContext implements ProcedureContext {

432

private final StreamExecutionEnvironment env;

433

434

public TestProcedureContext(StreamExecutionEnvironment env) {

435

this.env = env;

436

}

437

438

@Override

439

public StreamExecutionEnvironment getExecutionEnvironment() {

440

return env;

441

}

442

}

443

}

444

```

445

446

## Types

447

448

### Core Procedure Types

449

450

```java { .api }

451

import org.apache.flink.table.procedure.ProcedureContext;

452

import org.apache.flink.table.procedure.DefaultProcedureContext;

453

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

454

```

455

456

### Table Integration Types

457

458

```java { .api }

459

import org.apache.flink.table.api.TableResult;

460

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

461

import org.apache.flink.table.api.bridge.java.StreamStatementSet;

462

import org.apache.flink.table.api.Table;

463

```

464

465

### Execution and Configuration Types

466

467

```java { .api }

468

import org.apache.flink.api.common.JobExecutionResult;

469

import org.apache.flink.configuration.Configuration;

470

import org.apache.flink.streaming.api.CheckpointingMode;

471

import org.apache.flink.streaming.api.TimeCharacteristic;

472

```

473

474

### Async Processing Types

475

476

```java { .api }

477

import java.util.concurrent.CompletableFuture;

478

import org.apache.flink.streaming.api.functions.async.AsyncFunction;

479

import org.apache.flink.streaming.api.functions.async.ResultFuture;

480

```