0
# Streaming Data Sinks
1
2
Comprehensive sink implementations for streaming data integration with Apache Cassandra. The connector provides multiple sink types optimized for different data formats (tuples vs POJOs) and processing guarantees (at-least-once vs exactly-once).
3
4
## Capabilities
5
6
### CassandraSink Factory
7
8
Main entry point for creating Cassandra sinks with automatic type detection and builder selection.
9
10
```java { .api }
11
/**
12
* Creates a sink builder appropriate for the input stream type
13
* @param input DataStream to sink to Cassandra
14
* @return CassandraSinkBuilder for further configuration
15
*/
16
public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
17
```
18
19
**Usage Examples:**
20
21
```java
22
// Tuple-based sink (uses CassandraTupleSinkBuilder)
23
DataStream<Tuple3<String, Integer, String>> tupleStream = // your stream
24
CassandraSink.addSink(tupleStream)
25
.setQuery("INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
26
.setHost("localhost")
27
.build();
28
29
// POJO-based sink (uses CassandraPojoSinkBuilder)
30
DataStream<User> pojoStream = // your stream with @Table annotated POJOs
31
CassandraSink.addSink(pojoStream)
32
.setHost("localhost")
33
.build();
34
```
35
36
### CassandraSinkBuilder Base Configuration
37
38
Abstract base class providing common configuration options for all sink types.
39
40
```java { .api }
41
/**
42
* Base builder class for Cassandra sink configuration
43
*/
44
public abstract static class CassandraSinkBuilder<IN> {
45
/**
46
* Sets the CQL query for tuple-based sinks (not applicable for POJO sinks)
47
* @param query CQL INSERT statement with parameter placeholders
48
* @return this builder for method chaining
49
*/
50
public CassandraSinkBuilder<IN> setQuery(String query);
51
52
/**
53
* Sets Cassandra host with default port 9042
54
* @param host hostname or IP address
55
* @return this builder for method chaining
56
*/
57
public CassandraSinkBuilder<IN> setHost(String host);
58
59
/**
60
* Sets Cassandra host and port
61
* @param host hostname or IP address
62
* @param port port number
63
* @return this builder for method chaining
64
*/
65
public CassandraSinkBuilder<IN> setHost(String host, int port);
66
67
/**
68
* Sets custom cluster configuration builder
69
* @param builder ClusterBuilder for advanced connection configuration
70
* @return this builder for method chaining
71
*/
72
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
73
74
/**
75
* Enables write-ahead logging for exactly-once processing guarantees
76
* @return this builder for method chaining
77
*/
78
public CassandraSinkBuilder<IN> enableWriteAheadLog();
79
80
/**
81
* Enables write-ahead logging with custom checkpoint committer
82
* @param committer custom CheckpointCommitter implementation
83
* @return this builder for method chaining
84
*/
85
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
86
87
/**
88
* Finalizes the sink configuration and creates the sink
89
* @return configured CassandraSink
90
* @throws Exception if configuration is invalid
91
*/
92
public abstract CassandraSink<IN> build() throws Exception;
93
}
94
```
95
96
### CassandraTupleSinkBuilder
97
98
Specialized builder for tuple-based data streams requiring explicit CQL queries.
99
100
```java { .api }
101
/**
102
* Builder for tuple-based Cassandra sinks
103
* Requires explicit CQL query with parameter placeholders matching tuple arity
104
*/
105
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
106
public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);
107
108
/**
109
* Builds tuple sink with optional write-ahead logging
110
* @return CassandraSink configured for tuple data
111
* @throws Exception if query is null/empty or cluster configuration missing
112
*/
113
@Override
114
public CassandraSink<IN> build() throws Exception;
115
}
116
```
117
118
**Usage Examples:**
119
120
```java
121
DataStream<Tuple4<String, Integer, Double, Boolean>> orders = // your stream
122
123
CassandraSink<Tuple4<String, Integer, Double, Boolean>> sink = CassandraSink
124
.addSink(orders)
125
.setQuery("INSERT INTO orders (id, quantity, price, processed) VALUES (?, ?, ?, ?)")
126
.setHost("cassandra-cluster", 9042)
127
.enableWriteAheadLog() // for exactly-once guarantees
128
.build();
129
130
sink.name("Orders Cassandra Sink")
131
.setParallelism(4);
132
```
133
134
### CassandraPojoSinkBuilder
135
136
Specialized builder for POJO-based data streams using DataStax mapping annotations.
137
138
```java { .api }
139
/**
140
* Builder for POJO-based Cassandra sinks
141
* Uses DataStax mapping annotations on POJO classes for table mapping
142
* CQL queries are not allowed - mapping is handled automatically
143
*/
144
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
145
public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer);
146
147
/**
148
* Builds POJO sink (write-ahead logging not supported for POJOs)
149
* @return CassandraSink configured for POJO data
150
* @throws Exception if query is specified (not allowed) or cluster configuration missing
151
*/
152
@Override
153
public CassandraSink<IN> build() throws Exception;
154
}
155
```
156
157
**Usage Examples:**
158
159
```java
160
// POJO class with DataStax mapping annotations
161
@Table(keyspace = "analytics", name = "user_events")
162
public class UserEvent {
163
@PartitionKey
164
@Column(name = "user_id")
165
private String userId;
166
167
@Column(name = "event_type")
168
private String eventType;
169
170
@Column(name = "timestamp")
171
private Long timestamp;
172
173
// constructors, getters, setters...
174
}
175
176
DataStream<UserEvent> events = // your stream
177
178
CassandraSink<UserEvent> sink = CassandraSink
179
.addSink(events)
180
.setClusterBuilder(new ClusterBuilder() {
181
@Override
182
protected Cluster buildCluster(Cluster.Builder builder) {
183
return builder
184
.addContactPoint("cassandra1.example.com")
185
.addContactPoint("cassandra2.example.com")
186
.withPort(9042)
187
.withCredentials("username", "password")
188
.build();
189
}
190
})
191
.build();
192
```
193
194
### CassandraSink Configuration
195
196
The main sink wrapper providing Flink operator configuration methods.
197
198
```java { .api }
199
/**
200
* Main Cassandra sink wrapper class providing Flink operator configuration
201
*/
202
public class CassandraSink<IN> {
203
/**
204
* Sets the name of this sink for visualization and logging
205
* @param name operator name
206
* @return this sink for method chaining
207
*/
208
public CassandraSink<IN> name(String name);
209
210
/**
211
* Sets unique operator ID for savepoint compatibility
212
* @param uid unique operator identifier
213
* @return this sink for method chaining
214
*/
215
public CassandraSink<IN> uid(String uid);
216
217
/**
218
* Sets user-provided hash for JobVertexID
219
* @param uidHash user-provided hash string
220
* @return this sink for method chaining
221
*/
222
public CassandraSink<IN> setUidHash(String uidHash);
223
224
/**
225
* Sets the parallelism for this sink
226
* @param parallelism degree of parallelism (must be > 0)
227
* @return this sink for method chaining
228
*/
229
public CassandraSink<IN> setParallelism(int parallelism);
230
231
/**
232
* Disables operator chaining for this sink
233
* @return this sink for method chaining
234
*/
235
public CassandraSink<IN> disableChaining();
236
237
/**
238
* Sets the slot sharing group for co-location control
239
* @param slotSharingGroup slot sharing group name
240
* @return this sink for method chaining
241
*/
242
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
243
}
244
```
245
246
### CassandraSinkBase Abstract Class
247
248
Base abstract class providing common functionality for all Cassandra sink implementations.
249
250
```java { .api }
251
/**
252
* Common abstract class for CassandraPojoSink and CassandraTupleSink
253
* Provides connection management, error handling, and asynchronous callback processing
254
*/
255
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
256
/**
257
* Creates sink base with cluster configuration
258
* @param builder ClusterBuilder for connection configuration
259
*/
260
protected CassandraSinkBase(ClusterBuilder builder);
261
262
/**
263
* Opens connection and initializes callback handling
264
* @param configuration Flink configuration parameters
265
*/
266
@Override
267
public void open(Configuration configuration);
268
269
/**
270
* Invokes sink processing for input value with error handling
271
* @param value input value to process
272
* @throws Exception if processing fails or previous error occurred
273
*/
274
@Override
275
public void invoke(IN value) throws Exception;
276
277
/**
278
* Abstract method for sending value to Cassandra (implemented by subclasses)
279
* @param value input value to send
280
* @return ListenableFuture for asynchronous processing
281
*/
282
public abstract ListenableFuture<V> send(IN value);
283
284
/**
285
* Closes connections and waits for pending operations to complete
286
* @throws Exception if cleanup fails or pending operations had errors
287
*/
288
@Override
289
public void close() throws Exception;
290
}
291
```
292
293
### Sink Implementation Classes
294
295
#### CassandraTupleSink
296
297
Direct sink implementation for tuple data with parameterized CQL queries.
298
299
```java { .api }
300
/**
301
* Sink implementation for tuple-based data using prepared CQL statements
302
*/
303
public class CassandraTupleSink<IN extends Tuple> extends CassandraSinkBase<IN, ResultSet> {
304
/**
305
* Creates a tuple sink with CQL query and cluster configuration
306
* @param insertQuery CQL INSERT statement with parameter placeholders
307
* @param builder ClusterBuilder for connection configuration
308
*/
309
public CassandraTupleSink(String insertQuery, ClusterBuilder builder);
310
311
/**
312
* Opens connection and prepares CQL statement
313
* @param configuration Flink configuration parameters
314
*/
315
@Override
316
public void open(Configuration configuration);
317
318
/**
319
* Sends tuple value to Cassandra using prepared statement
320
* @param value tuple value to send
321
* @return ListenableFuture for asynchronous execution
322
*/
323
@Override
324
public ListenableFuture<ResultSet> send(IN value);
325
326
/**
327
* Extracts field values from tuple into object array for prepared statement binding
328
* @param record tuple record to extract fields from
329
* @return Object array with field values
330
*/
331
private Object[] extract(IN record);
332
}
333
```
334
335
#### CassandraPojoSink
336
337
Direct sink implementation for POJO data using DataStax mapping framework.
338
339
```java { .api }
340
/**
341
* Sink implementation for POJO-based data using DataStax mapping annotations
342
*/
343
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
344
/**
345
* Creates a POJO sink with mapping configuration
346
* @param clazz POJO class with DataStax mapping annotations
347
* @param builder ClusterBuilder for connection configuration
348
*/
349
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);
350
351
/**
352
* Opens connection and initializes DataStax mapper
353
* @param configuration Flink configuration parameters
354
*/
355
@Override
356
public void open(Configuration configuration);
357
358
/**
359
* Sends POJO value to Cassandra using DataStax mapper
360
* @param value POJO value to send
361
* @return ListenableFuture for asynchronous execution
362
*/
363
@Override
364
public ListenableFuture<ResultSet> send(IN value);
365
}
366
```
367
368
#### CassandraTupleWriteAheadSink
369
370
Write-ahead logging sink for exactly-once processing guarantees.
371
372
```java { .api }
373
/**
374
* Write-ahead logging sink for tuple data providing exactly-once semantics
375
* Stores incoming records in state backend and commits only on checkpoint completion
376
*/
377
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
378
/**
379
* Creates WAL-enabled sink with checkpoint coordination
380
* @param insertQuery CQL INSERT statement with parameter placeholders
381
* @param serializer type serializer for state backend storage
382
* @param builder ClusterBuilder for connection configuration
383
* @param committer CheckpointCommitter for checkpoint state management
384
* @throws Exception if initialization fails
385
*/
386
protected CassandraTupleWriteAheadSink(
387
String insertQuery,
388
TypeSerializer<IN> serializer,
389
ClusterBuilder builder,
390
CheckpointCommitter committer
391
) throws Exception;
392
393
/**
394
* Opens connections and validates checkpointing is enabled
395
* @throws Exception if checkpointing is disabled or connection fails
396
*/
397
public void open() throws Exception;
398
399
/**
400
* Closes connections and cleans up resources
401
* @throws Exception if cleanup fails
402
*/
403
@Override
404
public void close() throws Exception;
405
406
/**
407
* Sends batch of values to Cassandra with checkpoint coordination
408
* @param values batch of tuples to write
409
* @param checkpointId checkpoint ID for coordination
410
* @param timestamp checkpoint timestamp
411
* @return true if all writes successful, false if any failed
412
* @throws Exception if batch processing fails
413
*/
414
@Override
415
protected boolean sendValues(Iterable<IN> values, long checkpointId, long timestamp) throws Exception;
416
}
417
```
418
419
## Error Handling
420
421
All sink implementations provide asynchronous error handling through Guava `ListenableFuture` callbacks. Errors during Cassandra operations are logged and cause the sink to fail the current record processing.
422
423
For write-ahead logging sinks, failed operations will trigger checkpoint rollback and retry mechanisms according to Flink's fault tolerance configuration.
424
425
Common error scenarios:
426
- Connection failures: Cluster unreachable or authentication issues
427
- Schema mismatches: CQL query parameters don't match tuple arity or POJO mapping issues
428
- Constraint violations: Primary key conflicts or data type conversion errors