or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconnection-configuration.mdfault-tolerance.mdindex.mdstreaming-sinks.md

streaming-sinks.mddocs/

0

# Streaming Data Sinks

1

2

Comprehensive sink implementations for streaming data integration with Apache Cassandra. The connector provides multiple sink types optimized for different data formats (tuples vs POJOs) and processing guarantees (at-least-once vs exactly-once).

3

4

## Capabilities

5

6

### CassandraSink Factory

7

8

Main entry point for creating Cassandra sinks with automatic type detection and builder selection.

9

10

```java { .api }

11

/**

12

* Creates a sink builder appropriate for the input stream type

13

* @param input DataStream to sink to Cassandra

14

* @return CassandraSinkBuilder for further configuration

15

*/

16

public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

17

```

18

19

**Usage Examples:**

20

21

```java

22

// Tuple-based sink (uses CassandraTupleSinkBuilder)

23

DataStream<Tuple3<String, Integer, String>> tupleStream = // your stream

24

CassandraSink.addSink(tupleStream)

25

.setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")

26

.setHost("localhost")

27

.build();

28

29

// POJO-based sink (uses CassandraPojoSinkBuilder)

30

DataStream<User> pojoStream = // your stream with @Table annotated POJOs

31

CassandraSink.addSink(pojoStream)

32

.setHost("localhost")

33

.build();

34

```

35

36

### CassandraSinkBuilder Base Configuration

37

38

Abstract base class providing common configuration options for all sink types.

39

40

```java { .api }

41

/**

42

* Base builder class for Cassandra sink configuration

43

*/

44

public abstract static class CassandraSinkBuilder<IN> {

45

/**

46

* Sets the CQL query for tuple-based sinks (not applicable for POJO sinks)

47

* @param query CQL INSERT statement with parameter placeholders

48

* @return this builder for method chaining

49

*/

50

public CassandraSinkBuilder<IN> setQuery(String query);

51

52

/**

53

* Sets Cassandra host with default port 9042

54

* @param host hostname or IP address

55

* @return this builder for method chaining

56

*/

57

public CassandraSinkBuilder<IN> setHost(String host);

58

59

/**

60

* Sets Cassandra host and port

61

* @param host hostname or IP address

62

* @param port port number

63

* @return this builder for method chaining

64

*/

65

public CassandraSinkBuilder<IN> setHost(String host, int port);

66

67

/**

68

* Sets custom cluster configuration builder

69

* @param builder ClusterBuilder for advanced connection configuration

70

* @return this builder for method chaining

71

*/

72

public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);

73

74

/**

75

* Enables write-ahead logging for exactly-once processing guarantees

76

* @return this builder for method chaining

77

*/

78

public CassandraSinkBuilder<IN> enableWriteAheadLog();

79

80

/**

81

* Enables write-ahead logging with custom checkpoint committer

82

* @param committer custom CheckpointCommitter implementation

83

* @return this builder for method chaining

84

*/

85

public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);

86

87

/**

88

* Finalizes the sink configuration and creates the sink

89

* @return configured CassandraSink

90

* @throws Exception if configuration is invalid

91

*/

92

public abstract CassandraSink<IN> build() throws Exception;

93

}

94

```

95

96

### CassandraTupleSinkBuilder

97

98

Specialized builder for tuple-based data streams requiring explicit CQL queries.

99

100

```java { .api }

101

/**

102

* Builder for tuple-based Cassandra sinks

103

* Requires explicit CQL query with parameter placeholders matching tuple arity

104

*/

105

public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {

106

public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);

107

108

/**

109

* Builds tuple sink with optional write-ahead logging

110

* @return CassandraSink configured for tuple data

111

* @throws Exception if query is null/empty or cluster configuration missing

112

*/

113

@Override

114

public CassandraSink<IN> build() throws Exception;

115

}

116

```

117

118

**Usage Examples:**

119

120

```java

121

DataStream<Tuple4<String, Integer, Double, Boolean>> orders = // your stream

122

123

CassandraSink<Tuple4<String, Integer, Double, Boolean>> sink = CassandraSink

124

.addSink(orders)

125

.setQuery("INSERT INTO orders (id, quantity, price, processed) VALUES (?, ?, ?, ?)")

126

.setHost("cassandra-cluster", 9042)

127

.enableWriteAheadLog() // for exactly-once guarantees

128

.build();

129

130

sink.name("Orders Cassandra Sink")

131

.setParallelism(4);

132

```

133

134

### CassandraPojoSinkBuilder

135

136

Specialized builder for POJO-based data streams using DataStax mapping annotations.

137

138

```java { .api }

139

/**

140

* Builder for POJO-based Cassandra sinks

141

* Uses DataStax mapping annotations on POJO classes for table mapping

142

* CQL queries are not allowed - mapping is handled automatically

143

*/

144

public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {

145

public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);

146

147

/**

148

* Builds POJO sink (write-ahead logging not supported for POJOs)

149

* @return CassandraSink configured for POJO data

150

* @throws Exception if query is specified (not allowed) or cluster configuration missing

151

*/

152

@Override

153

public CassandraSink<IN> build() throws Exception;

154

}

155

```

156

157

**Usage Examples:**

158

159

```java

160

// POJO class with DataStax mapping annotations

161

@Table(keyspace = "analytics", name = "user_events")

162

public class UserEvent {

163

@PartitionKey

164

@Column(name = "user_id")

165

private String userId;

166

167

@Column(name = "event_type")

168

private String eventType;

169

170

@Column(name = "timestamp")

171

private Long timestamp;

172

173

// constructors, getters, setters...

174

}

175

176

DataStream<UserEvent> events = // your stream

177

178

CassandraSink<UserEvent> sink = CassandraSink

179

.addSink(events)

180

.setClusterBuilder(new ClusterBuilder() {

181

@Override

182

protected Cluster buildCluster(Cluster.Builder builder) {

183

return builder

184

.addContactPoint("cassandra1.example.com")

185

.addContactPoint("cassandra2.example.com")

186

.withPort(9042)

187

.withCredentials("username", "password")

188

.build();

189

}

190

})

191

.build();

192

```

193

194

### CassandraSink Configuration

195

196

The main sink wrapper providing Flink operator configuration methods.

197

198

```java { .api }

199

/**

200

* Main Cassandra sink wrapper class providing Flink operator configuration

201

*/

202

public class CassandraSink<IN> {

203

/**

204

* Sets the name of this sink for visualization and logging

205

* @param name operator name

206

* @return this sink for method chaining

207

*/

208

public CassandraSink<IN> name(String name);

209

210

/**

211

* Sets unique operator ID for savepoint compatibility

212

* @param uid unique operator identifier

213

* @return this sink for method chaining

214

*/

215

public CassandraSink<IN> uid(String uid);

216

217

/**

218

* Sets user-provided hash for JobVertexID

219

* @param uidHash user-provided hash string

220

* @return this sink for method chaining

221

*/

222

public CassandraSink<IN> setUidHash(String uidHash);

223

224

/**

225

* Sets the parallelism for this sink

226

* @param parallelism degree of parallelism (must be > 0)

227

* @return this sink for method chaining

228

*/

229

public CassandraSink<IN> setParallelism(int parallelism);

230

231

/**

232

* Disables operator chaining for this sink

233

* @return this sink for method chaining

234

*/

235

public CassandraSink<IN> disableChaining();

236

237

/**

238

* Sets the slot sharing group for co-location control

239

* @param slotSharingGroup slot sharing group name

240

* @return this sink for method chaining

241

*/

242

public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);

243

}

244

```

245

246

### CassandraSinkBase Abstract Class

247

248

Base abstract class providing common functionality for all Cassandra sink implementations.

249

250

```java { .api }

251

/**

252

* Common abstract class for CassandraPojoSink and CassandraTupleSink

253

* Provides connection management, error handling, and asynchronous callback processing

254

*/

255

public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {

256

/**

257

* Creates sink base with cluster configuration

258

* @param builder ClusterBuilder for connection configuration

259

*/

260

protected CassandraSinkBase(ClusterBuilder builder);

261

262

/**

263

* Opens connection and initializes callback handling

264

* @param configuration Flink configuration parameters

265

*/

266

@Override

267

public void open(Configuration configuration);

268

269

/**

270

* Invokes sink processing for input value with error handling

271

* @param value input value to process

272

* @throws Exception if processing fails or previous error occurred

273

*/

274

@Override

275

public void invoke(IN value) throws Exception;

276

277

/**

278

* Abstract method for sending value to Cassandra (implemented by subclasses)

279

* @param value input value to send

280

* @return ListenableFuture for asynchronous processing

281

*/

282

public abstract ListenableFuture<V> send(IN value);

283

284

/**

285

* Closes connections and waits for pending operations to complete

286

* @throws Exception if cleanup fails or pending operations had errors

287

*/

288

@Override

289

public void close() throws Exception;

290

}

291

```

292

293

### Sink Implementation Classes

294

295

#### CassandraTupleSink

296

297

Direct sink implementation for tuple data with parameterized CQL queries.

298

299

```java { .api }

300

/**

301

* Sink implementation for tuple-based data using prepared CQL statements

302

*/

303

public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {

304

/**

305

* Creates a tuple sink with CQL query and cluster configuration

306

* @param insertQuery CQL INSERT statement with parameter placeholders

307

* @param builder ClusterBuilder for connection configuration

308

*/

309

public CassandraTupleSink(String insertQuery, ClusterBuilder builder);

310

311

/**

312

* Opens connection and prepares CQL statement

313

* @param configuration Flink configuration parameters

314

*/

315

@Override

316

public void open(Configuration configuration);

317

318

/**

319

* Sends tuple value to Cassandra using prepared statement

320

* @param value tuple value to send

321

* @return ListenableFuture for asynchronous execution

322

*/

323

@Override

324

public ListenableFuture<ResultSet> send(IN value);

325

326

/**

327

* Extracts field values from tuple into object array for prepared statement binding

328

* @param record tuple record to extract fields from

329

* @return Object array with field values

330

*/

331

private Object[] extract(IN record);

332

}

333

```

334

335

#### CassandraPojoSink

336

337

Direct sink implementation for POJO data using DataStax mapping framework.

338

339

```java { .api }

340

/**

341

* Sink implementation for POJO-based data using DataStax mapping annotations

342

*/

343

public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {

344

/**

345

* Creates a POJO sink with mapping configuration

346

* @param clazz POJO class with DataStax mapping annotations

347

* @param builder ClusterBuilder for connection configuration

348

*/

349

public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);

350

351

/**

352

* Opens connection and initializes DataStax mapper

353

* @param configuration Flink configuration parameters

354

*/

355

@Override

356

public void open(Configuration configuration);

357

358

/**

359

* Sends POJO value to Cassandra using DataStax mapper

360

* @param value POJO value to send

361

* @return ListenableFuture for asynchronous execution

362

*/

363

@Override

364

public ListenableFuture<ResultSet> send(IN value);

365

}

366

```

367

368

#### CassandraTupleWriteAheadSink

369

370

Write-ahead logging sink for exactly-once processing guarantees.

371

372

```java { .api }

373

/**

374

* Write-ahead logging sink for tuple data providing exactly-once semantics

375

* Stores incoming records in state backend and commits only on checkpoint completion

376

*/

377

public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {

378

/**

379

* Creates WAL-enabled sink with checkpoint coordination

380

* @param insertQuery CQL INSERT statement with parameter placeholders

381

* @param serializer type serializer for state backend storage

382

* @param builder ClusterBuilder for connection configuration

383

* @param committer CheckpointCommitter for checkpoint state management

384

* @throws Exception if initialization fails

385

*/

386

protected CassandraTupleWriteAheadSink(

387

String insertQuery,

388

TypeSerializer<IN> serializer,

389

ClusterBuilder builder,

390

CheckpointCommitter committer

391

) throws Exception;

392

393

/**

394

* Opens connections and validates checkpointing is enabled

395

* @throws Exception if checkpointing is disabled or connection fails

396

*/

397

public void open() throws Exception;

398

399

/**

400

* Closes connections and cleans up resources

401

* @throws Exception if cleanup fails

402

*/

403

@Override

404

public void close() throws Exception;

405

406

/**

407

* Sends batch of values to Cassandra with checkpoint coordination

408

* @param values batch of tuples to write

409

* @param checkpointId checkpoint ID for coordination

410

* @param timestamp checkpoint timestamp

411

* @return true if all writes successful, false if any failed

412

* @throws Exception if batch processing fails

413

*/

414

@Override

415

protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;

416

}

417

```

418

419

## Error Handling

420

421

All sink implementations provide asynchronous error handling through Guava `ListenableFuture` callbacks. Errors during Cassandra operations are logged and cause the sink to fail the current record processing.

422

423

For write-ahead logging sinks, failed operations will trigger checkpoint rollback and retry mechanisms according to Flink's fault tolerance configuration.

424

425

Common error scenarios:

426

- Connection failures: Cluster unreachable or authentication issues

427

- Schema mismatches: CQL query parameters don't match tuple arity or POJO mapping issues

428

- Constraint violations: Primary key conflicts or data type conversion errors