or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconsumer.mddynamodb-streams.mdindex.mdpartitioning.mdproducer.mdserialization.mdtable-api.md

table-api.mddocs/

0

# Table API Integration

1

2

SQL and Table API support through dynamic table factories for declarative stream processing with Kinesis sources and sinks, enabling integration with Flink's unified batch and stream processing APIs.

3

4

## Capabilities

5

6

### KinesisDynamicTableFactory

7

8

Factory class for creating Kinesis table sources and sinks that integrate with Flink's Table API ecosystem.

9

10

```java { .api }

11

@Internal

12

public class KinesisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

13

14

public static final String IDENTIFIER = "kinesis";

15

16

/**

17

* Create a dynamic table source for reading from Kinesis.

18

*

19

* @param context Factory context with table schema and options

20

* @return Configured Kinesis table source

21

*/

22

public DynamicTableSource createDynamicTableSource(Context context);

23

24

/**

25

* Create a dynamic table sink for writing to Kinesis.

26

*

27

* @param context Factory context with table schema and options

28

* @return Configured Kinesis table sink

29

*/

30

public DynamicTableSink createDynamicTableSink(Context context);

31

32

/**

33

* Get the factory identifier for table DDL.

34

*

35

* @return Factory identifier string

36

*/

37

public String factoryIdentifier();

38

39

/**

40

* Get required configuration options.

41

*

42

* @return Set of required configuration options

43

*/

44

public Set<ConfigOption<?>> requiredOptions();

45

46

/**

47

* Get optional configuration options.

48

*

49

* @return Set of optional configuration options

50

*/

51

public Set<ConfigOption<?>> optionalOptions();

52

53

/**

54

* Validate Kinesis partitioner configuration.

55

*

56

* @param tableOptions Table configuration options

57

* @param targetTable Catalog table definition

58

*/

59

public static void validateKinesisPartitioner(ReadableConfig tableOptions, CatalogTable targetTable);

60

}

61

```

62

63

### KinesisDynamicSource

64

65

Dynamic table source implementation for reading from Kinesis streams in Table API queries.

66

67

```java { .api }

68

@Internal

69

public class KinesisDynamicSource implements ScanTableSource, SupportsReadingMetadata {

70

71

/**

72

* Get the change log mode supported by this source.

73

*

74

* @return Change log mode (INSERT only for Kinesis)

75

*/

76

public ChangelogMode getChangelogMode();

77

78

/**

79

* Create the actual source function for reading data.

80

*

81

* @param context Source function context

82

* @return Configured source function

83

*/

84

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context);

85

86

/**

87

* Copy the source with projection applied.

88

*

89

* @param projectedFields Projected field indices

90

* @return New source with projection

91

*/

92

public DynamicTableSource copy();

93

94

/**

95

* Get summary string for debugging.

96

*

97

* @return Summary string

98

*/

99

public String asSummaryString();

100

}

101

```

102

103

### KinesisDynamicSink

104

105

Dynamic table sink implementation for writing to Kinesis streams from Table API queries.

106

107

```java { .api }

108

@Internal

109

public class KinesisDynamicSink implements DynamicTableSink {

110

111

/**

112

* Get the change log mode accepted by this sink.

113

*

114

* @param requestedMode Requested change log mode

115

* @return Accepted change log mode

116

*/

117

public ChangelogMode getChangelogMode(ChangelogMode requestedMode);

118

119

/**

120

* Create the actual sink function for writing data.

121

*

122

* @param context Sink function context

123

* @return Configured sink function

124

*/

125

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

126

127

/**

128

* Copy the sink with updated configuration.

129

*

130

* @return New sink copy

131

*/

132

public DynamicTableSink copy();

133

134

/**

135

* Get summary string for debugging.

136

*

137

* @return Summary string

138

*/

139

public String asSummaryString();

140

}

141

```

142

143

## Usage Examples

144

145

### Creating Kinesis Tables with DDL

146

147

```sql

148

-- Create a Kinesis source table

149

CREATE TABLE kinesis_source (

150

event_id STRING,

151

user_id BIGINT,

152

event_type STRING,

153

timestamp_col TIMESTAMP(3),

154

payload ROW<

155

action STRING,

156

properties MAP<STRING, STRING>

157

>,

158

-- Kinesis metadata columns

159

kinesis_partition_key STRING METADATA FROM 'partition-key',

160

kinesis_sequence_number STRING METADATA FROM 'sequence-number',

161

kinesis_shard_id STRING METADATA FROM 'shard-id',

162

kinesis_stream_name STRING METADATA FROM 'stream-name',

163

kinesis_arrival_timestamp TIMESTAMP(3) METADATA FROM 'arrival-timestamp',

164

-- Watermark for event time processing

165

WATERMARK FOR timestamp_col AS timestamp_col - INTERVAL '30' SECOND

166

) WITH (

167

'connector' = 'kinesis',

168

'stream' = 'user-events',

169

'aws.region' = 'us-west-2',

170

'aws.credentials.provider' = 'AUTO',

171

'scan.stream.initpos' = 'LATEST',

172

'format' = 'json'

173

);

174

175

-- Create a Kinesis sink table

176

CREATE TABLE kinesis_sink (

177

processed_event_id STRING,

178

user_id BIGINT,

179

aggregated_count BIGINT,

180

window_start TIMESTAMP(3),

181

window_end TIMESTAMP(3)

182

) WITH (

183

'connector' = 'kinesis',

184

'stream' = 'processed-events',

185

'aws.region' = 'us-west-2',

186

'aws.credentials.provider' = 'AUTO',

187

'format' = 'json',

188

'sink.partitioner' = 'fixed'

189

);

190

```

191

192

### Real-Time Analytics Query

193

194

```sql

195

-- Real-time user event aggregation

196

INSERT INTO kinesis_sink

197

SELECT

198

CONCAT('agg_', event_id) as processed_event_id,

199

user_id,

200

COUNT(*) as aggregated_count,

201

TUMBLE_START(timestamp_col, INTERVAL '5' MINUTE) as window_start,

202

TUMBLE_END(timestamp_col, INTERVAL '5' MINUTE) as window_end

203

FROM kinesis_source

204

WHERE event_type = 'page_view'

205

GROUP BY

206

user_id,

207

event_id,

208

TUMBLE(timestamp_col, INTERVAL '5' MINUTE);

209

```

210

211

### Multi-Stream Processing

212

213

```sql

214

-- Create multiple Kinesis source tables

215

CREATE TABLE orders_stream (

216

order_id STRING,

217

customer_id STRING,

218

product_id STRING,

219

quantity INT,

220

price DECIMAL(10,2),

221

order_time TIMESTAMP(3),

222

WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND

223

) WITH (

224

'connector' = 'kinesis',

225

'stream' = 'orders',

226

'aws.region' = 'us-west-2',

227

'format' = 'json'

228

);

229

230

CREATE TABLE inventory_stream (

231

product_id STRING,

232

available_quantity INT,

233

update_time TIMESTAMP(3),

234

WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND

235

) WITH (

236

'connector' = 'kinesis',

237

'stream' = 'inventory-updates',

238

'aws.region' = 'us-west-2',

239

'format' = 'json'

240

);

241

242

-- Join streams for real-time inventory management

243

CREATE TABLE inventory_alerts (

244

product_id STRING,

245

order_quantity INT,

246

available_quantity INT,

247

alert_message STRING,

248

alert_time TIMESTAMP(3)

249

) WITH (

250

'connector' = 'kinesis',

251

'stream' = 'inventory-alerts',

252

'aws.region' = 'us-west-2',

253

'format' = 'json'

254

);

255

256

INSERT INTO inventory_alerts

257

SELECT

258

o.product_id,

259

SUM(o.quantity) as order_quantity,

260

LAST_VALUE(i.available_quantity) as available_quantity,

261

CASE

262

WHEN LAST_VALUE(i.available_quantity) < SUM(o.quantity)

263

THEN 'LOW_STOCK_ALERT'

264

ELSE 'STOCK_OK'

265

END as alert_message,

266

CURRENT_TIMESTAMP as alert_time

267

FROM orders_stream o

268

LEFT JOIN inventory_stream i

269

ON o.product_id = i.product_id

270

AND i.update_time BETWEEN o.order_time - INTERVAL '1' HOUR AND o.order_time + INTERVAL '5' MINUTE

271

GROUP BY

272

o.product_id,

273

TUMBLE(o.order_time, INTERVAL '1' MINUTE);

274

```

275

276

### DynamoDB Streams Integration

277

278

```sql

279

-- Create table for DynamoDB Streams

280

CREATE TABLE dynamodb_changes (

281

event_name STRING,

282

table_name STRING,

283

partition_key STRING,

284

sort_key STRING,

285

old_image ROW<

286

user_id STRING,

287

username STRING,

288

email STRING

289

>,

290

new_image ROW<

291

user_id STRING,

292

username STRING,

293

email STRING

294

>,

295

approximate_creation_time TIMESTAMP(3),

296

WATERMARK FOR approximate_creation_time AS approximate_creation_time - INTERVAL '1' MINUTE

297

) WITH (

298

'connector' = 'kinesis',

299

'stream' = 'arn:aws:dynamodb:us-west-2:123456789012:table/Users/stream/2023-01-01T00:00:00.000',

300

'aws.region' = 'us-west-2',

301

'format' = 'json'

302

);

303

304

-- Create change log for audit purposes

305

CREATE TABLE user_audit_log (

306

change_id STRING,

307

user_id STRING,

308

change_type STRING,

309

old_values STRING,

310

new_values STRING,

311

change_timestamp TIMESTAMP(3)

312

) WITH (

313

'connector' = 'kinesis',

314

'stream' = 'user-audit-log',

315

'aws.region' = 'us-west-2',

316

'format' = 'json'

317

);

318

319

INSERT INTO user_audit_log

320

SELECT

321

CONCAT(table_name, '_', partition_key, '_', UNIX_TIMESTAMP(approximate_creation_time)) as change_id,

322

partition_key as user_id,

323

event_name as change_type,

324

CASE WHEN old_image IS NOT NULL THEN CAST(old_image AS STRING) ELSE NULL END as old_values,

325

CASE WHEN new_image IS NOT NULL THEN CAST(new_image AS STRING) ELSE NULL END as new_values,

326

approximate_creation_time as change_timestamp

327

FROM dynamodb_changes

328

WHERE event_name IN ('INSERT', 'MODIFY', 'REMOVE');

329

```

330

331

### Complex Event Processing

332

333

```sql

334

-- Create pattern detection table

335

CREATE TABLE user_behavior_events (

336

user_id STRING,

337

event_type STRING,

338

page_url STRING,

339

session_id STRING,

340

event_timestamp TIMESTAMP(3),

341

WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND

342

) WITH (

343

'connector' = 'kinesis',

344

'stream' = 'user-behavior',

345

'aws.region' = 'us-west-2',

346

'format' = 'json'

347

);

348

349

-- Fraud detection patterns

350

CREATE TABLE fraud_alerts (

351

user_id STRING,

352

alert_type STRING,

353

event_count BIGINT,

354

time_window_start TIMESTAMP(3),

355

time_window_end TIMESTAMP(3),

356

alert_timestamp TIMESTAMP(3)

357

) WITH (

358

'connector' = 'kinesis',

359

'stream' = 'fraud-alerts',

360

'aws.region' = 'us-west-2',

361

'format' = 'json'

362

);

363

364

-- Detect suspicious patterns (too many events in short time)

365

INSERT INTO fraud_alerts

366

SELECT

367

user_id,

368

'HIGH_FREQUENCY_ACTIVITY' as alert_type,

369

COUNT(*) as event_count,

370

TUMBLE_START(event_timestamp, INTERVAL '1' MINUTE) as time_window_start,

371

TUMBLE_END(event_timestamp, INTERVAL '1' MINUTE) as time_window_end,

372

CURRENT_TIMESTAMP as alert_timestamp

373

FROM user_behavior_events

374

GROUP BY

375

user_id,

376

TUMBLE(event_timestamp, INTERVAL '1' MINUTE)

377

HAVING COUNT(*) > 100; -- More than 100 events per minute

378

```

379

380

### Temporal Table Joins

381

382

```sql

383

-- Create product catalog table (changelog stream)

384

CREATE TABLE product_catalog (

385

product_id STRING,

386

product_name STRING,

387

category STRING,

388

price DECIMAL(10,2),

389

update_time TIMESTAMP(3),

390

WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,

391

PRIMARY KEY (product_id) NOT ENFORCED

392

) WITH (

393

'connector' = 'kinesis',

394

'stream' = 'product-catalog-changes',

395

'aws.region' = 'us-west-2',

396

'format' = 'json'

397

);

398

399

-- Create versioned table for temporal joins

400

CREATE TABLE product_catalog_versioned (

401

product_id STRING,

402

product_name STRING,

403

category STRING,

404

price DECIMAL(10,2),

405

update_time TIMESTAMP(3),

406

WATERMARK FOR update_time AS update_time - INTERVAL '10' SECOND,

407

PRIMARY KEY (product_id) NOT ENFORCED

408

) WITH (

409

'connector' = 'kinesis',

410

'stream' = 'product-catalog-changes',

411

'aws.region' = 'us-west-2',

412

'format' = 'json'

413

);

414

415

-- Join orders with product information as of order time

416

CREATE TABLE enriched_orders (

417

order_id STRING,

418

customer_id STRING,

419

product_id STRING,

420

product_name STRING,

421

category STRING,

422

quantity INT,

423

unit_price DECIMAL(10,2),

424

total_amount DECIMAL(10,2),

425

order_time TIMESTAMP(3)

426

) WITH (

427

'connector' = 'kinesis',

428

'stream' = 'enriched-orders',

429

'aws.region' = 'us-west-2',

430

'format' = 'json'

431

);

432

433

INSERT INTO enriched_orders

434

SELECT

435

o.order_id,

436

o.customer_id,

437

o.product_id,

438

p.product_name,

439

p.category,

440

o.quantity,

441

p.price as unit_price,

442

o.quantity * p.price as total_amount,

443

o.order_time

444

FROM orders_stream o

445

LEFT JOIN product_catalog_versioned FOR SYSTEM_TIME AS OF o.order_time AS p

446

ON o.product_id = p.product_id;

447

```

448

449

## Java Table API Examples

450

451

### Programmatic Table Creation

452

453

```java

454

import org.apache.flink.table.api.EnvironmentSettings;

455

import org.apache.flink.table.api.TableEnvironment;

456

import org.apache.flink.table.api.Schema;

457

import org.apache.flink.table.api.DataTypes;

458

459

// Create Table Environment

460

EnvironmentSettings settings = EnvironmentSettings

461

.newInstance()

462

.inStreamingMode()

463

.build();

464

TableEnvironment tableEnv = TableEnvironment.create(settings);

465

466

// Create Kinesis source table programmatically

467

tableEnv.createTemporaryTable("kinesis_events",

468

TableDescriptor.forConnector("kinesis")

469

.schema(Schema.newBuilder()

470

.column("event_id", DataTypes.STRING())

471

.column("user_id", DataTypes.BIGINT())

472

.column("event_type", DataTypes.STRING())

473

.column("timestamp_col", DataTypes.TIMESTAMP(3))

474

.column("kinesis_partition_key", DataTypes.STRING())

475

.metadata("partition-key")

476

.column("kinesis_sequence_number", DataTypes.STRING())

477

.metadata("sequence-number")

478

.watermark("timestamp_col", "timestamp_col - INTERVAL '30' SECOND")

479

.build())

480

.option("stream", "user-events")

481

.option("aws.region", "us-west-2")

482

.option("aws.credentials.provider", "AUTO")

483

.option("scan.stream.initpos", "LATEST")

484

.option("format", "json")

485

.build());

486

487

// Execute query

488

Table result = tableEnv.sqlQuery(

489

"SELECT user_id, COUNT(*) as event_count " +

490

"FROM kinesis_events " +

491

"WHERE event_type = 'click' " +

492

"GROUP BY user_id"

493

);

494

495

// Write to another Kinesis stream

496

result.executeInsert("kinesis_sink");

497

```

498

499

### Custom Formats and Serialization

500

501

```java

502

// Register custom format

503

tableEnv.executeSql(

504

"CREATE TABLE custom_format_table (" +

505

" data STRING," +

506

" metadata_field STRING METADATA FROM 'partition-key'" +

507

") WITH (" +

508

" 'connector' = 'kinesis'," +

509

" 'stream' = 'custom-format-stream'," +

510

" 'aws.region' = 'us-west-2'," +

511

" 'format' = 'avro'," +

512

" 'avro.schema' = '{" +

513

" \"type\": \"record\"," +

514

" \"name\": \"CustomEvent\"," +

515

" \"fields\": [" +

516

" {\"name\": \"data\", \"type\": \"string\"}" +

517

" ]" +

518

" }'" +

519

")"

520

);

521

```

522

523

## Configuration Options

524

525

### Common Table Options

526

527

```properties

528

# Required options

529

connector = kinesis

530

stream = my-stream-name

531

aws.region = us-west-2

532

533

# Authentication options

534

aws.credentials.provider = AUTO | BASIC | PROFILE | ASSUME_ROLE | ENV_VAR | SYS_PROP

535

aws.access-key-id = your-access-key

536

aws.secret-access-key = your-secret-key

537

538

# Source-specific options

539

scan.stream.initpos = LATEST | TRIM_HORIZON | AT_TIMESTAMP

540

scan.stream.initpos.timestamp = 2023-01-01T00:00:00Z

541

scan.shard.getrecords.maxrecordcount = 10000

542

scan.shard.getrecords.intervalmillis = 200

543

544

# Sink-specific options

545

sink.partitioner = fixed | random | custom

546

sink.partitioner.field-delimiter = |

547

sink.flush-buffer.size = 1000

548

sink.flush-buffer.timeout = 2s

549

550

# Format options

551

format = json | avro | csv | raw

552

```

553

554

### Advanced Configuration

555

556

```sql

557

-- Enhanced Fan-Out configuration

558

CREATE TABLE efo_source (

559

data STRING

560

) WITH (

561

'connector' = 'kinesis',

562

'stream' = 'my-stream',

563

'aws.region' = 'us-west-2',

564

'scan.stream.recordpublisher' = 'EFO',

565

'scan.stream.efo.consumername' = 'my-flink-app',

566

'scan.stream.efo.registration' = 'LAZY'

567

);

568

569

-- Custom partitioning for sink

570

CREATE TABLE partitioned_sink (

571

user_id STRING,

572

data STRING

573

) WITH (

574

'connector' = 'kinesis',

575

'stream' = 'partitioned-output',

576

'aws.region' = 'us-west-2',

577

'format' = 'json',

578

'sink.partitioner' = 'custom',

579

'sink.partitioner.class' = 'com.example.MyCustomPartitioner'

580

);

581

```

582

583

## Best Practices

584

585

1. **Schema Evolution**: Use flexible formats like JSON or Avro for schema evolution

586

2. **Watermarks**: Configure appropriate watermark strategies for event-time processing

587

3. **Metadata**: Leverage Kinesis metadata columns for debugging and monitoring

588

4. **Partitioning**: Choose appropriate partitioning strategies for optimal performance

589

5. **Error Handling**: Implement proper error handling and dead letter queues

590

6. **Resource Management**: Configure appropriate parallelism and resource allocation

591

7. **Monitoring**: Use Flink metrics and Kinesis CloudWatch metrics for monitoring