or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-connectors.mdconfiguration.mdindex.mdstreaming-sinks.mdtable-api.mdwrite-ahead-logging.md

table-api.mddocs/

0

# Table API Integration

1

2

Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference, SQL compatibility, and seamless integration with Flink's unified batch and streaming table processing.

3

4

## Capabilities

5

6

### Append Table Sink

7

8

Implementation of Flink's `AppendStreamTableSink` for writing table data to Cassandra.

9

10

```java { .api }

11

public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {

12

public CassandraAppendTableSink(ClusterBuilder builder, String cql);

13

public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties);

14

15

public TypeInformation<Row> getOutputType();

16

public String[] getFieldNames();

17

public TypeInformation<?>[] getFieldTypes();

18

public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);

19

public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);

20

}

21

```

22

23

### Basic Usage

24

25

**Simple Table Sink:**

26

27

```java

28

import org.apache.flink.streaming.connectors.cassandra.CassandraAppendTableSink;

29

import org.apache.flink.table.api.EnvironmentSettings;

30

import org.apache.flink.table.api.TableEnvironment;

31

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

32

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

33

34

// Set up table environment

35

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

36

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

37

38

// Create cluster builder

39

ClusterBuilder builder = new ClusterBuilder() {

40

@Override

41

protected Cluster buildCluster(Cluster.Builder builder) {

42

return builder.addContactPoint("127.0.0.1").build();

43

}

44

};

45

46

// Create and register Cassandra table sink

47

CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(

48

builder,

49

"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"

50

);

51

52

tableEnv.registerTableSink("cassandra_users", cassandraSink);

53

```

54

55

**With Configuration Properties:**

56

57

```java

58

import java.util.Properties;

59

60

// Additional configuration properties

61

Properties properties = new Properties();

62

properties.setProperty("cassandra.connection.timeout", "10000");

63

properties.setProperty("cassandra.read.timeout", "5000");

64

65

CassandraAppendTableSink configuredSink = new CassandraAppendTableSink(

66

builder,

67

"INSERT INTO example.orders (order_id, customer_id, total, created_at) VALUES (?, ?, ?, ?)",

68

properties

69

);

70

71

tableEnv.registerTableSink("cassandra_orders", configuredSink);

72

```

73

74

### SQL Integration

75

76

Use SQL to write data to Cassandra:

77

78

```java

79

// Create source table (from Kafka, file, etc.)

80

tableEnv.executeSql(

81

"CREATE TABLE source_users (" +

82

" id STRING," +

83

" name STRING," +

84

" age INT," +

85

" email STRING" +

86

") WITH (" +

87

" 'connector' = 'kafka'," +

88

" 'topic' = 'user_events'," +

89

" 'properties.bootstrap.servers' = 'localhost:9092'," +

90

" 'format' = 'json'" +

91

")"

92

);

93

94

// Register Cassandra sink

95

CassandraAppendTableSink cassandraSink = new CassandraAppendTableSink(

96

builder,

97

"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"

98

);

99

tableEnv.registerTableSink("cassandra_users", cassandraSink);

100

101

// SQL query to transform and sink data

102

tableEnv.executeSql(

103

"INSERT INTO cassandra_users " +

104

"SELECT id, UPPER(name) as name, age, email " +

105

"FROM source_users " +

106

"WHERE age >= 18"

107

);

108

```

109

110

### Schema Configuration

111

112

Configure field names and types explicitly:

113

114

```java

115

// Define schema

116

String[] fieldNames = {"user_id", "full_name", "user_age", "email_address"};

117

TypeInformation<?>[] fieldTypes = {

118

Types.STRING,

119

Types.STRING,

120

Types.INT,

121

Types.STRING

122

};

123

124

// Configure sink with schema

125

CassandraAppendTableSink schemaConfiguredSink = new CassandraAppendTableSink(

126

builder,

127

"INSERT INTO example.users (id, name, age, email) VALUES (?, ?, ?, ?)"

128

).configure(fieldNames, fieldTypes);

129

130

tableEnv.registerTableSink("typed_cassandra_users", schemaConfiguredSink);

131

```

132

133

### Complex Data Processing

134

135

Leverage Table API for complex transformations before writing to Cassandra:

136

137

```java

138

import org.apache.flink.table.api.Table;

139

140

// Create source table with complex schema

141

tableEnv.executeSql(

142

"CREATE TABLE event_stream (" +

143

" event_id STRING," +

144

" user_id STRING," +

145

" event_type STRING," +

146

" event_time TIMESTAMP(3)," +

147

" properties MAP<STRING, STRING>," +

148

" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +

149

") WITH (" +

150

" 'connector' = 'kafka'," +

151

" 'topic' = 'events'," +

152

" 'properties.bootstrap.servers' = 'localhost:9092'," +

153

" 'format' = 'json'" +

154

")"

155

);

156

157

// Process data with windowing and aggregation

158

Table processedData = tableEnv.sqlQuery(

159

"SELECT " +

160

" user_id," +

161

" event_type," +

162

" COUNT(*) as event_count," +

163

" MAX(event_time) as last_event_time," +

164

" TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start " +

165

"FROM event_stream " +

166

"GROUP BY " +

167

" user_id, " +

168

" event_type, " +

169

" TUMBLE(event_time, INTERVAL '1' HOUR)"

170

);

171

172

// Register Cassandra sink for aggregated data

173

CassandraAppendTableSink aggregationSink = new CassandraAppendTableSink(

174

builder,

175

"INSERT INTO example.user_hourly_stats (user_id, event_type, event_count, last_event_time, window_start) VALUES (?, ?, ?, ?, ?)"

176

);

177

178

String[] aggFieldNames = {"user_id", "event_type", "event_count", "last_event_time", "window_start"};

179

TypeInformation<?>[] aggFieldTypes = {

180

Types.STRING,

181

Types.STRING,

182

Types.LONG,

183

Types.SQL_TIMESTAMP,

184

Types.SQL_TIMESTAMP

185

};

186

187

tableEnv.registerTableSink("user_stats", aggregationSink.configure(aggFieldNames, aggFieldTypes));

188

189

// Insert processed data

190

processedData.executeInsert("user_stats");

191

```

192

193

### Catalog Integration

194

195

Register Cassandra tables in Flink's catalog system:

196

197

```java

198

import org.apache.flink.table.catalog.GenericInMemoryCatalog;

199

import org.apache.flink.table.catalog.ObjectPath;

200

import org.apache.flink.table.catalog.CatalogTable;

201

import org.apache.flink.table.catalog.CatalogTableImpl;

202

import org.apache.flink.table.api.Schema;

203

204

// Create catalog

205

GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("cassandra_catalog");

206

tableEnv.registerCatalog("cassandra_catalog", catalog);

207

tableEnv.useCatalog("cassandra_catalog");

208

209

// Define table schema

210

Schema schema = Schema.newBuilder()

211

.column("id", DataTypes.STRING())

212

.column("name", DataTypes.STRING())

213

.column("age", DataTypes.INT())

214

.column("created_at", DataTypes.TIMESTAMP(3))

215

.build();

216

217

// Create table descriptor

218

Map<String, String> properties = new HashMap<>();

219

properties.put("connector", "cassandra");

220

properties.put("cassandra.hosts", "127.0.0.1");

221

properties.put("cassandra.keyspace", "example");

222

properties.put("cassandra.table", "users");

223

224

CatalogTable catalogTable = CatalogTableImpl.of(

225

schema,

226

"Cassandra users table",

227

new ArrayList<>(),

228

properties

229

);

230

231

// Register table in catalog

232

catalog.createTable(

233

new ObjectPath("default", "users"),

234

catalogTable,

235

false

236

);

237

```

238

239

## Advanced Table Patterns

240

241

### Dynamic Tables

242

243

Handle changing schemas with dynamic table concepts:

244

245

```java

246

// Source with evolving schema

247

tableEnv.executeSql(

248

"CREATE TABLE dynamic_events (" +

249

" event_id STRING," +

250

" event_data ROW<" +

251

" user_id STRING," +

252

" action STRING," +

253

" metadata MAP<STRING, STRING>" +

254

" >," +

255

" event_time TIMESTAMP(3)" +

256

") WITH (" +

257

" 'connector' = 'kafka'," +

258

" 'topic' = 'dynamic_events'," +

259

" 'properties.bootstrap.servers' = 'localhost:9092'," +

260

" 'format' = 'json'" +

261

")"

262

);

263

264

// Flatten and transform

265

Table flattenedEvents = tableEnv.sqlQuery(

266

"SELECT " +

267

" event_id," +

268

" event_data.user_id," +

269

" event_data.action," +

270

" event_data.metadata['source'] as source," +

271

" event_time " +

272

"FROM dynamic_events"

273

);

274

275

// Register sink for flattened data

276

CassandraAppendTableSink dynamicSink = new CassandraAppendTableSink(

277

builder,

278

"INSERT INTO example.flattened_events (event_id, user_id, action, source, event_time) VALUES (?, ?, ?, ?, ?)"

279

);

280

281

tableEnv.registerTableSink("flattened_events", dynamicSink);

282

flattenedEvents.executeInsert("flattened_events");

283

```

284

285

### Temporal Tables

286

287

Work with time-based data processing:

288

289

```java

290

// Create temporal table with event time

291

tableEnv.executeSql(

292

"CREATE TABLE temporal_data (" +

293

" key_id STRING," +

294

" value_data STRING," +

295

" process_time AS PROCTIME()," +

296

" event_time TIMESTAMP(3)," +

297

" WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" +

298

") WITH (" +

299

" 'connector' = 'kafka'," +

300

" 'topic' = 'temporal_stream'," +

301

" 'properties.bootstrap.servers' = 'localhost:9092'," +

302

" 'format' = 'json'" +

303

")"

304

);

305

306

// Temporal join example (requires reference table)

307

Table temporalResult = tableEnv.sqlQuery(

308

"SELECT " +

309

" t1.key_id," +

310

" t1.value_data," +

311

" t1.event_time " +

312

"FROM temporal_data t1 " +

313

"WHERE t1.event_time > CURRENT_TIMESTAMP - INTERVAL '1' HOUR"

314

);

315

316

// Sink recent data to Cassandra

317

CassandraAppendTableSink temporalSink = new CassandraAppendTableSink(

318

builder,

319

"INSERT INTO example.recent_data (key_id, value_data, event_time) VALUES (?, ?, ?)"

320

);

321

322

tableEnv.registerTableSink("recent_data", temporalSink);

323

temporalResult.executeInsert("recent_data");

324

```

325

326

## Error Handling and Monitoring

327

328

### Sink Error Handling

329

330

Handle errors at the table level:

331

332

```java

333

// Custom sink with error handling

334

public class ErrorHandlingCassandraAppendTableSink extends CassandraAppendTableSink {

335

private final CassandraFailureHandler failureHandler;

336

337

public ErrorHandlingCassandraAppendTableSink(

338

ClusterBuilder builder,

339

String cql,

340

CassandraFailureHandler failureHandler) {

341

super(builder, cql);

342

this.failureHandler = failureHandler;

343

}

344

345

@Override

346

public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {

347

// Create sink with custom failure handler

348

CassandraRowSink sink = new CassandraRowSink(

349

dataStream.getType().getArity(),

350

getCql(),

351

getBuilder(),

352

CassandraSinkBaseConfig.newBuilder().build(),

353

failureHandler

354

);

355

356

return dataStream.addSink(sink)

357

.setParallelism(dataStream.getParallelism())

358

.name("Cassandra Table Sink");

359

}

360

}

361

```

362

363

### Performance Monitoring

364

365

Monitor table sink performance:

366

367

```java

368

// Enable metrics collection

369

env.getConfig().setLatencyTrackingInterval(1000);

370

371

// Create sink with monitoring

372

CassandraAppendTableSink monitoredSink = new CassandraAppendTableSink(

373

builder,

374

"INSERT INTO example.monitored_data (id, value, timestamp) VALUES (?, ?, ?)"

375

) {

376

@Override

377

public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {

378

return super.consumeDataStream(dataStream)

379

.name("Monitored Cassandra Sink")

380

.setParallelism(4); // Explicit parallelism for monitoring

381

}

382

};

383

384

tableEnv.registerTableSink("monitored_sink", monitoredSink);

385

```

386

387

## Limitations and Considerations

388

389

### Append-Only Semantics

390

391

The Cassandra table sink only supports append operations:

392

393

```java

394

// Supported: INSERT operations

395

table.executeInsert("cassandra_sink"); // ✓

396

397

// NOT supported: UPDATE/DELETE operations

398

// Table API UPDATE/DELETE queries will fail with CassandraAppendTableSink

399

```

400

401

### Schema Evolution

402

403

Handle schema changes carefully:

404

405

```java

406

// Define flexible schema for evolution

407

String[] flexibleFieldNames = {"id", "data", "metadata", "timestamp"};

408

TypeInformation<?>[] flexibleFieldTypes = {

409

Types.STRING,

410

Types.STRING, // JSON string for flexible data

411

Types.MAP(Types.STRING, Types.STRING), // Key-value metadata

412

Types.SQL_TIMESTAMP

413

};

414

415

CassandraAppendTableSink flexibleSink = new CassandraAppendTableSink(

416

builder,

417

"INSERT INTO example.flexible_data (id, data, metadata, timestamp) VALUES (?, ?, ?, ?)"

418

).configure(flexibleFieldNames, flexibleFieldTypes);

419

```

420

421

### Performance Considerations

422

423

Table API adds processing overhead:

424

425

```java

426

// For high-throughput scenarios, consider direct DataStream API

427

// Table API is better for complex transformations and SQL compatibility

428

429

// Direct DataStream (higher performance)

430

stream.addSink(cassandraDirectSink);

431

432

// Table API (more functionality, slightly lower performance)

433

Table table = tableEnv.fromDataStream(stream);

434

table.executeInsert("cassandra_table_sink");

435

```