Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-cassandra@1.14.00
# Apache Flink Cassandra Connector
1
2
A comprehensive Apache Flink connector for Apache Cassandra that enables streaming applications to write data efficiently into Cassandra databases. The connector provides multiple sink implementations for different data types, supports both streaming and batch processing modes, offers configurable failure handling mechanisms, and integrates with Flink's checkpointing system for fault tolerance.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-cassandra_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven project:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-connector-cassandra_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
For streaming applications:
21
22
```java
23
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
24
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
25
```
26
27
For batch applications:
28
29
```java
30
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
31
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
32
```
33
34
Configuration imports:
35
36
```java
37
import org.apache.flink.streaming.connectors.cassandra.CassandraFailureHandler;
38
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
39
```
40
41
## Basic Usage
42
43
### Simple Streaming Sink
44
45
```java
46
import org.apache.flink.streaming.api.datastream.DataStream;
47
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
48
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
49
import org.apache.flink.api.java.tuple.Tuple3;
50
import com.datastax.driver.core.Cluster;
51
52
// Create a simple ClusterBuilder
53
ClusterBuilder builder = new ClusterBuilder() {
54
@Override
55
protected Cluster buildCluster(Cluster.Builder builder) {
56
return builder.addContactPoint("127.0.0.1").build();
57
}
58
};
59
60
// Configure and add sink to DataStream
61
DataStream<Tuple3<String, Integer, String>> stream = // ... your data stream
62
63
CassandraSink.addSink(stream)
64
.setQuery("INSERT INTO example.words (word, count, description) VALUES (?, ?, ?);")
65
.setHost("127.0.0.1")
66
.build();
67
```
68
69
### POJO Sink with Mapper
70
71
```java
72
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
73
import com.datastax.driver.mapping.annotations.Table;
74
import com.datastax.driver.mapping.annotations.Column;
75
76
// Define a POJO with Cassandra annotations
77
@Table(keyspace = "example", name = "users")
78
public class User {
79
@Column(name = "id")
80
private String id;
81
82
@Column(name = "name")
83
private String name;
84
85
@Column(name = "age")
86
private Integer age;
87
88
// constructors, getters, setters...
89
}
90
91
// Use the POJO sink
92
DataStream<User> userStream = // ... your user stream
93
94
CassandraSink.addSink(userStream)
95
.setDefaultKeyspace("example")
96
.setHost("127.0.0.1")
97
.build();
98
```
99
100
### Exactly-Once Processing with Write-Ahead Log
101
102
```java
103
CassandraSink.addSink(stream)
104
.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")
105
.setHost("127.0.0.1")
106
.enableWriteAheadLog() // Enable exactly-once processing
107
.build();
108
```
109
110
## Architecture
111
112
The Apache Flink Cassandra Connector is built around several key architectural components:
113
114
- **Sink Hierarchy**: Base `CassandraSinkBase` with type-specific implementations for different data formats
115
- **Builder Pattern**: Fluent `CassandraSinkBuilder` API for configuration with automatic type detection
116
- **DataStax Driver Integration**: Built on DataStax Java Driver 3.0.0 with shaded dependencies
117
- **Fault Tolerance**: Integration with Flink's checkpointing and write-ahead logging for exactly-once guarantees
118
- **Type System**: Support for Flink Tuples, Rows, POJOs, and Scala Products with automatic serialization
119
- **Configuration Management**: Centralized configuration through `CassandraSinkBaseConfig` and builder patterns
120
121
## Capabilities
122
123
### Streaming Data Sinks
124
125
Primary streaming sink functionality supporting multiple data types including Tuples, Rows, POJOs, and Scala Products. Provides builder-based configuration with automatic type detection and comprehensive failure handling.
126
127
```java { .api }
128
public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
129
130
public abstract static class CassandraSinkBuilder<IN> {
131
public CassandraSinkBuilder<IN> setQuery(String query);
132
public CassandraSinkBuilder<IN> setHost(String host);
133
public CassandraSinkBuilder<IN> setHost(String host, int port);
134
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
135
public CassandraSinkBuilder<IN> enableWriteAheadLog();
136
public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);
137
public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);
138
public CassandraSinkBuilder<IN> enableIgnoreNullFields();
139
public CassandraSink<IN> build();
140
}
141
```
142
143
[Streaming Sinks](./streaming-sinks.md)
144
145
### Batch Data Processing
146
147
Batch input and output formats for reading from and writing to Cassandra in batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.
148
149
```java { .api }
150
public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT>;
151
public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT>;
152
public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT>;
153
public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT>;
154
```
155
156
[Batch Connectors](./batch-connectors.md)
157
158
### Configuration and Connection Management
159
160
Connection builders, failure handlers, and configuration objects for customizing Cassandra connectivity, error handling, and performance tuning.
161
162
```java { .api }
163
public abstract class ClusterBuilder {
164
protected abstract Cluster buildCluster(Cluster.Builder builder);
165
}
166
167
public interface CassandraFailureHandler {
168
void onFailure(Throwable failure) throws IOException;
169
}
170
171
public interface MapperOptions {
172
Mapper.Option[] getMapperOptions();
173
}
174
```
175
176
[Configuration](./configuration.md)
177
178
### Write-Ahead Logging
179
180
Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion.
181
182
```java { .api }
183
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN>;
184
public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row>;
185
public class CassandraCommitter extends CheckpointCommitter;
186
```
187
188
[Write-Ahead Logging](./write-ahead-logging.md)
189
190
### Table API Integration
191
192
Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference and SQL compatibility.
193
194
```java { .api }
195
public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
196
public CassandraAppendTableSink(ClusterBuilder builder, String cql);
197
public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);
198
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);
199
}
200
```
201
202
[Table API](./table-api.md)
203
204
## Common Data Types
205
206
```java { .api }
207
// Connection configuration
208
public abstract class ClusterBuilder implements Serializable {
209
protected abstract Cluster buildCluster(Cluster.Builder builder);
210
}
211
212
// Failure handling
213
public interface CassandraFailureHandler extends Serializable {
214
void onFailure(Throwable failure) throws IOException;
215
}
216
217
// Configuration management
218
public final class CassandraSinkBaseConfig {
219
public int getMaxConcurrentRequests();
220
public Duration getMaxConcurrentRequestsTimeout();
221
public boolean getIgnoreNullFields();
222
223
public static Builder newBuilder();
224
}
225
226
// Mapper configuration for POJOs
227
public interface MapperOptions extends Serializable {
228
Mapper.Option[] getMapperOptions();
229
}
230
231
// Checkpoint management for exactly-once processing
232
public class CassandraCommitter extends CheckpointCommitter {
233
public CassandraCommitter(ClusterBuilder builder);
234
public void commitCheckpoint(int subtaskIdx, long checkpointId);
235
public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);
236
}
237
```