or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mdconnector-framework.mddatastream-integration.mdindex.mdprocedure-context.mdstatement-set.mdtable-environment.md

procedure-context.mddocs/

0

# Procedure Context

1

2

Framework for stored procedure execution with access to StreamExecutionEnvironment. Provides context and environment access for implementing custom stored procedures in Flink's table ecosystem.

3

4

## Capabilities

5

6

### Procedure Context Interface

7

8

Core interface providing execution environment access for stored procedures.

9

10

```java { .api }

11

/**

12

* Context to provide necessary services for stored procedure execution

13

*/

14

public interface ProcedureContext {

15

16

/**

17

* Returns the StreamExecutionEnvironment where the procedure is called.

18

* Flink creates a new StreamExecutionEnvironment based on current configuration

19

* and passes it to the procedure for every procedure call. The procedure can

20

* modify the passed StreamExecutionEnvironment safely as it won't be leaked outside.

21

*

22

* @return StreamExecutionEnvironment for the procedure execution

23

*/

24

StreamExecutionEnvironment getExecutionEnvironment();

25

}

26

```

27

28

### Default Implementation

29

30

Default implementation of ProcedureContext for standard use cases.

31

32

```java { .api }

33

/**

34

* Default implementation of ProcedureContext

35

*/

36

public class DefaultProcedureContext implements ProcedureContext {

37

38

/**

39

* Creates a default procedure context with the specified execution environment

40

* @param executionEnvironment The StreamExecutionEnvironment to provide to procedures

41

*/

42

public DefaultProcedureContext(StreamExecutionEnvironment executionEnvironment);

43

44

/**

45

* Returns the configured StreamExecutionEnvironment

46

* @return StreamExecutionEnvironment for procedure execution

47

*/

48

@Override

49

public StreamExecutionEnvironment getExecutionEnvironment();

50

}

51

```

52

53

## Usage Examples

54

55

### Basic Procedure Implementation

56

57

```java

58

import org.apache.flink.table.procedure.ProcedureContext;

59

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

60

import org.apache.flink.streaming.api.datastream.DataStream;

61

import org.apache.flink.table.functions.TableFunction;

62

import org.apache.flink.api.java.utils.ParameterTool;

63

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

64

import org.apache.flink.core.execution.JobListener;

65

import org.apache.flink.api.common.JobExecutionResult;

66

import java.util.Collections;

67

import java.util.Map;

68

69

/**

70

* Example stored procedure that processes data using DataStream API

71

*/

72

public class DataProcessingProcedure {

73

74

public void processData(ProcedureContext context, String inputPath, String outputPath) {

75

// Get execution environment from context

76

StreamExecutionEnvironment env = context.getExecutionEnvironment();

77

78

// Configure environment settings

79

env.setParallelism(4);

80

env.enableCheckpointing(60000);

81

82

// Create DataStream processing pipeline

83

DataStream<String> inputStream = env.readTextFile(inputPath);

84

85

DataStream<String> processedStream = inputStream

86

.filter(line -> !line.isEmpty())

87

.map(String::toUpperCase)

88

.filter(line -> line.contains("IMPORTANT"));

89

90

// Write results

91

processedStream.writeAsText(outputPath);

92

93

// Execute the job

94

try {

95

env.execute("Data Processing Procedure");

96

} catch (Exception e) {

97

throw new RuntimeException("Procedure execution failed", e);

98

}

99

}

100

}

101

```

102

103

### Advanced Procedure with Table Integration

104

105

```java

106

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

107

import org.apache.flink.table.api.Table;

108

import org.apache.flink.types.Row;

109

110

/**

111

* Advanced procedure combining DataStream and Table APIs

112

*/

113

public class AdvancedAnalyticsProcedure {

114

115

public void runAnalytics(ProcedureContext context, String configPath) {

116

StreamExecutionEnvironment env = context.getExecutionEnvironment();

117

118

// Create table environment using the procedure's execution environment

119

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

120

121

// Configure sources based on config

122

tableEnv.executeSql(

123

"CREATE TABLE source_data (" +

124

" id BIGINT," +

125

" name STRING," +

126

" amount DECIMAL(10,2)," +

127

" event_time TIMESTAMP_LTZ(3)" +

128

") WITH (" +

129

" 'connector' = 'kafka'," +

130

" 'topic' = 'input-topic'," +

131

" 'properties.bootstrap.servers' = 'localhost:9092'" +

132

")"

133

);

134

135

// Perform table operations

136

Table analyticsResult = tableEnv.sqlQuery(

137

"SELECT " +

138

" name," +

139

" COUNT(*) as event_count," +

140

" SUM(amount) as total_amount," +

141

" AVG(amount) as avg_amount " +

142

"FROM source_data " +

143

"WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +

144

"GROUP BY name"

145

);

146

147

// Convert to DataStream for custom processing

148

DataStream<Row> resultStream = tableEnv.toDataStream(analyticsResult);

149

150

// Apply custom DataStream operations

151

resultStream

152

.filter(row -> (Double) row.getField(3) > 100.0) // avg_amount > 100

153

.map(row -> String.format("High-value customer: %s (avg: %.2f)",

154

row.getField(0), row.getField(3)))

155

.print("alerts");

156

157

// Execute the complete pipeline

158

try {

159

env.execute("Advanced Analytics Procedure");

160

} catch (Exception e) {

161

throw new RuntimeException("Analytics procedure failed", e);

162

}

163

}

164

}

165

```

166

167

### Procedure Context Factory

168

169

```java

170

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

171

import org.apache.flink.table.procedure.DefaultProcedureContext;

172

173

/**

174

* Factory for creating procedure contexts with custom configurations

175

*/

176

public class ProcedureContextFactory {

177

178

public static ProcedureContext createContext() {

179

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

180

181

// Configure environment for procedure execution

182

env.setParallelism(2);

183

env.enableCheckpointing(30000);

184

env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(

185

Collections.singletonMap("procedure.mode", "batch")

186

));

187

188

return new DefaultProcedureContext(env);

189

}

190

191

public static ProcedureContext createStreamingContext(int parallelism) {

192

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

193

194

// Streaming-optimized configuration

195

env.setParallelism(parallelism);

196

env.enableCheckpointing(10000);

197

env.setBufferTimeout(100);

198

199

return new DefaultProcedureContext(env);

200

}

201

202

public static ProcedureContext createBatchContext() {

203

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

204

205

// Batch-optimized configuration

206

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

207

env.setParallelism(-1); // Max parallelism

208

209

return new DefaultProcedureContext(env);

210

}

211

}

212

```

213

214

### Procedure Registration and Execution

215

216

```java

217

// Register procedure in table environment

218

tableEnv.createTemporarySystemFunction("process_data", DataProcessingProcedure.class);

219

220

// Call procedure in SQL

221

tableEnv.executeSql("CALL process_data('/input/path', '/output/path')");

222

223

// Call procedure programmatically

224

ProcedureContext context = new DefaultProcedureContext(env);

225

DataProcessingProcedure procedure = new DataProcessingProcedure();

226

procedure.processData(context, "/input/path", "/output/path");

227

```

228

229

## Integration Patterns

230

231

### Procedure with Configuration

232

233

```java

234

import org.apache.flink.configuration.Configuration;

235

236

public class ConfigurableProcedure {

237

238

public void execute(ProcedureContext context, String configJson) {

239

StreamExecutionEnvironment env = context.getExecutionEnvironment();

240

241

// Parse configuration

242

ObjectMapper mapper = new ObjectMapper();

243

try {

244

ProcedureConfig config = mapper.readValue(configJson, ProcedureConfig.class);

245

246

// Apply configuration to environment

247

env.setParallelism(config.getParallelism());

248

env.enableCheckpointing(config.getCheckpointInterval());

249

250

if (config.getJobParameters() != null) {

251

env.getConfig().setGlobalJobParameters(

252

ParameterTool.fromMap(config.getJobParameters())

253

);

254

}

255

256

// Execute procedure logic with configuration

257

executeWithConfig(env, config);

258

259

} catch (Exception e) {

260

throw new RuntimeException("Configuration parsing failed", e);

261

}

262

}

263

264

private void executeWithConfig(StreamExecutionEnvironment env, ProcedureConfig config) {

265

// Implementation specific to configuration

266

}

267

268

public static class ProcedureConfig {

269

private int parallelism = 1;

270

private long checkpointInterval = 60000;

271

private Map<String, String> jobParameters;

272

273

// Getters and setters

274

}

275

}

276

```

277

278

### Error Handling and Monitoring

279

280

```java

281

public class MonitoredProcedure {

282

283

private static final Logger logger = LoggerFactory.getLogger(MonitoredProcedure.class);

284

285

public void executeWithMonitoring(ProcedureContext context, String jobName) {

286

StreamExecutionEnvironment env = context.getExecutionEnvironment();

287

288

try {

289

// Add monitoring and metrics

290

env.getConfig().setGlobalJobParameters(

291

ParameterTool.fromMap(Collections.singletonMap("job.name", jobName))

292

);

293

294

// Configure restart strategy

295

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

296

297

// Add execution monitoring

298

env.registerJobListener(new JobListener() {

299

@Override

300

public void onJobSubmitted(JobClient jobClient, Throwable throwable) {

301

logger.info("Procedure job submitted: {}", jobName);

302

}

303

304

@Override

305

public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {

306

if (throwable != null) {

307

logger.error("Procedure job failed: {}", jobName, throwable);

308

} else {

309

logger.info("Procedure job completed: {} in {}ms",

310

jobName, jobExecutionResult.getNetRuntime());

311

}

312

}

313

});

314

315

// Execute procedure logic

316

executeProcedureLogic(env);

317

318

JobExecutionResult result = env.execute(jobName);

319

logger.info("Procedure execution completed successfully");

320

321

} catch (Exception e) {

322

logger.error("Procedure execution failed: {}", jobName, e);

323

throw new RuntimeException("Procedure failed: " + jobName, e);

324

}

325

}

326

327

private void executeProcedureLogic(StreamExecutionEnvironment env) {

328

// Actual procedure implementation

329

}

330

}

331

```

332

333

## Type Definitions

334

335

### Context Lifecycle

336

337

```java { .api }

338

// Procedure context lifecycle

339

// 1. Context creation with StreamExecutionEnvironment

340

ProcedureContext context = new DefaultProcedureContext(env);

341

342

// 2. Context passed to procedure method

343

procedure.execute(context, parameters);

344

345

// 3. Procedure accesses environment via context

346

StreamExecutionEnvironment procEnv = context.getExecutionEnvironment();

347

348

// 4. Procedure modifies and uses environment safely

349

procEnv.setParallelism(4);

350

procEnv.execute("procedure-job");

351

```

352

353

### Environment Isolation

354

355

```java { .api }

356

/**

357

* Environment isolation guarantees:

358

* - Each procedure call gets a fresh StreamExecutionEnvironment

359

* - Modifications to the environment don't affect other procedures

360

* - Environment configuration is scoped to the procedure execution

361

* - Resource cleanup is handled automatically after procedure completion

362

*/

363

```

364

365

### Integration with Table Functions

366

367

```java { .api }

368

// Procedures can be combined with table functions for complex workflows

369

@FunctionHint(output = @DataTypeHint("ROW<result STRING, status STRING>"))

370

public class ProcedureTableFunction extends TableFunction<Row> {

371

372

public void eval(String input) {

373

ProcedureContext context = getCurrentContext();

374

375

// Execute procedure and collect results

376

try {

377

SomeProcedure procedure = new SomeProcedure();

378

procedure.execute(context, input);

379

collect(Row.of(input, "SUCCESS"));

380

} catch (Exception e) {

381

collect(Row.of(input, "FAILED: " + e.getMessage()));

382

}

383

}

384

}

385

```

386

387

## Best Practices

388

389

### Resource Management

390

391

```java

392

// Good: Proper resource cleanup

393

public void execute(ProcedureContext context) {

394

StreamExecutionEnvironment env = context.getExecutionEnvironment();

395

396

try {

397

// Configure environment

398

env.setParallelism(4);

399

400

// Execute logic

401

DataStream<String> stream = createDataStream(env);

402

stream.addSink(createSink());

403

404

env.execute("procedure-job");

405

406

} finally {

407

// Cleanup resources if needed

408

// Note: Environment cleanup is handled by Flink

409

}

410

}

411

```

412

413

### Configuration Management

414

415

```java

416

// Good: Externalized configuration

417

public void execute(ProcedureContext context, String configPath) {

418

StreamExecutionEnvironment env = context.getExecutionEnvironment();

419

420

// Load configuration from external source

421

Configuration config = Configuration.fromFile(configPath);

422

423

// Apply configuration

424

env.configure(config);

425

426

// Execute with configuration

427

executeWithConfig(env, config);

428

}

429

430

// Avoid: Hardcoded configuration

431

public void execute(ProcedureContext context) {

432

StreamExecutionEnvironment env = context.getExecutionEnvironment();

433

env.setParallelism(4); // Hardcoded - not flexible

434

}

435

```

436

437

### Error Handling

438

439

```java

440

// Good: Comprehensive error handling

441

public void execute(ProcedureContext context, String input) {

442

try {

443

validateInput(input);

444

445

StreamExecutionEnvironment env = context.getExecutionEnvironment();

446

setupEnvironment(env);

447

448

JobExecutionResult result = env.execute("procedure-job");

449

logSuccess(result);

450

451

} catch (ValidationException e) {

452

throw new IllegalArgumentException("Invalid procedure input", e);

453

} catch (Exception e) {

454

throw new RuntimeException("Procedure execution failed", e);

455

}

456

}

457

```