0
# Streaming Data Sinks
1
2
Comprehensive streaming sink functionality for writing data from Flink DataStreams to Cassandra. The connector supports multiple data types with automatic type detection, builder-based configuration, and robust failure handling mechanisms.
3
4
## Capabilities
5
6
### Main Sink Entry Point
7
8
The primary entry point for creating Cassandra sinks with automatic type detection and builder creation.
9
10
```java { .api }
11
public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
12
public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input);
13
```
14
15
The `addSink` method automatically detects the input data type and returns the appropriate builder:
16
- `TupleTypeInfo` → `CassandraTupleSinkBuilder`
17
- `RowTypeInfo` → `CassandraRowSinkBuilder`
18
- `PojoTypeInfo` → `CassandraPojoSinkBuilder`
19
- `CaseClassTypeInfo` → `CassandraScalaProductSinkBuilder`
20
21
### Sink Builder Configuration
22
23
Base builder providing common configuration options for all sink types.
24
25
```java { .api }
26
public abstract static class CassandraSinkBuilder<IN> {
27
public CassandraSinkBuilder<IN> setQuery(String query);
28
public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace);
29
public CassandraSinkBuilder<IN> setHost(String host);
30
public CassandraSinkBuilder<IN> setHost(String host, int port);
31
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
32
public CassandraSinkBuilder<IN> enableWriteAheadLog();
33
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
34
public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options);
35
public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);
36
public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);
37
public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout);
38
public CassandraSinkBuilder<IN> enableIgnoreNullFields();
39
public CassandraSink<IN> build();
40
}
41
```
42
43
**Usage Examples:**
44
45
```java
46
// Basic configuration with host
47
CassandraSink.addSink(stream)
48
.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
49
.setHost("127.0.0.1", 9042)
50
.build();
51
52
// Advanced configuration with custom cluster builder
53
ClusterBuilder builder = new ClusterBuilder() {
54
@Override
55
protected Cluster buildCluster(Cluster.Builder builder) {
56
return builder
57
.addContactPoint("127.0.0.1")
58
.addContactPoint("127.0.0.2")
59
.withPort(9042)
60
.withCredentials("username", "password")
61
.build();
62
}
63
};
64
65
CassandraSink.addSink(stream)
66
.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
67
.setClusterBuilder(builder)
68
.setMaxConcurrentRequests(100)
69
.enableIgnoreNullFields()
70
.build();
71
```
72
73
### Tuple Sink Builder
74
75
Specialized builder for Flink Tuple types with CQL query requirements.
76
77
```java { .api }
78
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
79
// Inherits all methods from CassandraSinkBuilder
80
// Requires setQuery() to be called
81
// Does not support setDefaultKeyspace()
82
}
83
```
84
85
**Usage Example:**
86
87
```java
88
DataStream<Tuple3<String, Integer, Boolean>> tupleStream = // ... your stream
89
90
CassandraSink.addSink(tupleStream)
91
.setQuery("INSERT INTO example.metrics (name, value, active) VALUES (?, ?, ?);")
92
.setHost("127.0.0.1")
93
.build();
94
```
95
96
### Row Sink Builder
97
98
Specialized builder for Flink Row types with schema-based field mapping.
99
100
```java { .api }
101
public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {
102
// Inherits all methods from CassandraSinkBuilder
103
// Requires setQuery() to be called
104
// Does not support setDefaultKeyspace()
105
}
106
```
107
108
**Usage Example:**
109
110
```java
111
DataStream<Row> rowStream = // ... your row stream
112
113
CassandraSink.addSink(rowStream)
114
.setQuery("INSERT INTO example.users (id, name, age) VALUES (?, ?, ?);")
115
.setHost("127.0.0.1")
116
.build();
117
```
118
119
### POJO Sink Builder
120
121
Specialized builder for Plain Old Java Objects using DataStax object mapping.
122
123
```java { .api }
124
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
125
// Inherits all methods from CassandraSinkBuilder
126
// Does not support setQuery() - uses DataStax annotations
127
// Supports setDefaultKeyspace() and setMapperOptions()
128
}
129
```
130
131
**Usage Example:**
132
133
```java
134
// Define POJO with Cassandra annotations
135
@Table(keyspace = "example", name = "products")
136
public class Product {
137
@PartitionKey
138
private String id;
139
140
@Column(name = "name")
141
private String name;
142
143
@Column(name = "price")
144
private BigDecimal price;
145
146
// constructors, getters, setters...
147
}
148
149
// Configure sink with mapper options
150
MapperOptions options = new MapperOptions() {
151
@Override
152
public Mapper.Option[] getMapperOptions() {
153
return new Mapper.Option[] {
154
Mapper.Option.saveNullFields(false),
155
Mapper.Option.timestamp(System.currentTimeMillis())
156
};
157
}
158
};
159
160
DataStream<Product> productStream = // ... your product stream
161
162
CassandraSink.addSink(productStream)
163
.setDefaultKeyspace("example")
164
.setMapperOptions(options)
165
.setHost("127.0.0.1")
166
.build();
167
```
168
169
### Scala Product Sink Builder
170
171
Specialized builder for Scala case classes and tuples.
172
173
```java { .api }
174
public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {
175
// Inherits all methods from CassandraSinkBuilder
176
// Requires setQuery() to be called
177
// Does not support setDefaultKeyspace()
178
}
179
```
180
181
### Sink Operations and Control
182
183
The resulting `CassandraSink` provides Flink operator configuration methods.
184
185
```java { .api }
186
public class CassandraSink<IN> {
187
public CassandraSink<IN> name(String name);
188
public CassandraSink<IN> uid(String uid);
189
public CassandraSink<IN> setUidHash(String uidHash);
190
public CassandraSink<IN> setParallelism(int parallelism);
191
public CassandraSink<IN> disableChaining();
192
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
193
}
194
```
195
196
**Usage Example:**
197
198
```java
199
CassandraSink.addSink(stream)
200
.setQuery("INSERT INTO example.events (id, timestamp, data) VALUES (?, ?, ?);")
201
.setHost("127.0.0.1")
202
.build()
203
.name("Cassandra Event Sink")
204
.uid("cassandra-sink-1")
205
.setParallelism(4)
206
.slotSharingGroup("cassandra-sinks");
207
```
208
209
## Core Sink Implementations
210
211
### Base Sink Functionality
212
213
All streaming sinks extend from the common base class providing core functionality.
214
215
```java { .api }
216
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
217
public void open(Configuration configuration);
218
public void close();
219
public void initializeState(FunctionInitializationContext context);
220
public void snapshotState(FunctionSnapshotContext ctx);
221
public void invoke(IN value);
222
public abstract ListenableFuture<V> send(IN value);
223
}
224
```
225
226
### Tuple-Based Sinks
227
228
Base class for sinks that work with tuple-like data structures.
229
230
```java { .api }
231
public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {
232
public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler);
233
public void open(Configuration configuration);
234
public ListenableFuture<ResultSet> send(IN value);
235
protected abstract Object[] extract(IN record);
236
}
237
```
238
239
### Specific Sink Types
240
241
Individual sink implementations for different data types.
242
243
```java { .api }
244
// Flink Tuple sink
245
public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {
246
public CassandraTupleSink(String insertQuery, ClusterBuilder builder);
247
protected Object[] extract(IN record);
248
}
249
250
// Flink Row sink
251
public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {
252
public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder);
253
protected Object[] extract(Row record);
254
}
255
256
// POJO sink with DataStax mapping
257
public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
258
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);
259
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, MapperOptions options);
260
public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace);
261
public void open(Configuration configuration);
262
public ListenableFuture<ResultSet> send(IN value);
263
}
264
265
// Scala Product sink
266
public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {
267
public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder);
268
protected Object[] extract(IN record);
269
}
270
```
271
272
## Error Handling
273
274
All sinks support custom failure handlers for robust error handling:
275
276
```java
277
// Custom failure handler example
278
CassandraFailureHandler customHandler = new CassandraFailureHandler() {
279
@Override
280
public void onFailure(Throwable failure) throws IOException {
281
if (failure instanceof WriteTimeoutException) {
282
// Log timeout but continue processing
283
logger.warn("Write timeout, continuing", failure);
284
return;
285
}
286
// Re-throw other exceptions to fail the sink
287
throw new IOException("Cassandra write failed", failure);
288
}
289
};
290
291
CassandraSink.addSink(stream)
292
.setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")
293
.setHost("127.0.0.1")
294
.setFailureHandler(customHandler)
295
.build();
296
```
297
298
## Performance Tuning
299
300
Configure concurrency and resource management:
301
302
```java
303
CassandraSink.addSink(stream)
304
.setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")
305
.setHost("127.0.0.1")
306
.setMaxConcurrentRequests(100) // Limit concurrent requests
307
.setMaxConcurrentRequests(100, Duration.ofSeconds(30)) // With timeout
308
.enableIgnoreNullFields() // Avoid tombstones
309
.build();
310
```