0
# Apache Flink Cassandra Connector
1
2
Apache Flink Cassandra Connector provides comprehensive integration between Apache Flink and Apache Cassandra databases. It supports both streaming (DataStream API) and batch (DataSet API) processing with multiple sink implementations for different data types and processing guarantees, including exactly-once semantics through write-ahead logging.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-cassandra_2.10
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add dependency to your Maven project:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-connector-cassandra_2.10</artifactId>
15
<version>1.3.3</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
23
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
24
import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase;
25
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
26
import org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink;
27
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
28
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
29
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
30
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
31
```
32
33
## Basic Usage
34
35
### Streaming Sink Example
36
37
```java
38
import org.apache.flink.streaming.api.datastream.DataStream;
39
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
40
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
41
import org.apache.flink.api.java.tuple.Tuple3;
42
import com.datastax.driver.core.Cluster;
43
44
// Define a ClusterBuilder for connection configuration
45
ClusterBuilder builder = new ClusterBuilder() {
46
@Override
47
protected Cluster buildCluster(Cluster.Builder builder) {
48
return builder.addContactPoint("127.0.0.1").withPort(9042).build();
49
}
50
};
51
52
// Create a tuple-based sink
53
DataStream<Tuple3<String, Integer, String>> stream = // your data stream
54
CassandraSink<Tuple3<String, Integer, String>> sink = CassandraSink
55
.addSink(stream)
56
.setQuery("INSERT INTO example.users (name, age, email) VALUES (?, ?, ?);")
57
.setHost("localhost", 9042)
58
.build();
59
60
sink.name("Cassandra Sink");
61
```
62
63
### Batch Processing Example
64
65
```java
66
import org.apache.flink.api.java.DataSet;
67
import org.apache.flink.api.java.ExecutionEnvironment;
68
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
69
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
70
import org.apache.flink.api.java.tuple.Tuple2;
71
72
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
73
74
// Read from Cassandra
75
CassandraInputFormat<Tuple2<String, Integer>> inputFormat =
76
new CassandraInputFormat<>("SELECT name, age FROM example.users", builder);
77
DataSet<Tuple2<String, Integer>> input = env.createInput(inputFormat);
78
79
// Write to Cassandra
80
CassandraOutputFormat<Tuple2<String, String>> outputFormat =
81
new CassandraOutputFormat<>("INSERT INTO example.processed (name, status) VALUES (?, ?)", builder);
82
result.output(outputFormat);
83
```
84
85
## Architecture
86
87
The Flink Cassandra Connector is organized into two main packages with distinct responsibilities:
88
89
- **Streaming Connectors** (`org.apache.flink.streaming.connectors.cassandra`): DataStream API integration with multiple sink implementations
90
- **Batch Connectors** (`org.apache.flink.batch.connectors.cassandra`): DataSet API integration for batch processing
91
- **Configuration Management**: `ClusterBuilder` abstract class for Cassandra cluster connection setup
92
- **Fault Tolerance**: Write-ahead logging support for exactly-once processing guarantees
93
94
## Capabilities
95
96
### Streaming Data Sinks
97
98
Comprehensive sink implementations for streaming data integration with different data types and processing guarantees.
99
100
```java { .api }
101
public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);
102
103
public abstract static class CassandraSinkBuilder<IN> {
104
public CassandraSinkBuilder<IN> setQuery(String query);
105
public CassandraSinkBuilder<IN> setHost(String host);
106
public CassandraSinkBuilder<IN> setHost(String host, int port);
107
public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);
108
public CassandraSinkBuilder<IN> enableWriteAheadLog();
109
public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);
110
public abstract CassandraSink<IN> build();
111
}
112
```
113
114
[Streaming Sinks](./streaming-sinks.md)
115
116
### Batch Data Processing
117
118
Input and output formats for batch processing jobs using the DataSet API.
119
120
```java { .api }
121
public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> {
122
public CassandraInputFormat(String query, ClusterBuilder builder);
123
}
124
125
public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
126
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
127
}
128
```
129
130
[Batch Processing](./batch-processing.md)
131
132
### Connection Configuration
133
134
Abstract configuration system for customizing Cassandra cluster connections with support for authentication, SSL, and advanced connection parameters.
135
136
```java { .api }
137
public abstract class ClusterBuilder implements Serializable {
138
public Cluster getCluster();
139
protected abstract Cluster buildCluster(Cluster.Builder builder);
140
}
141
```
142
143
[Connection Configuration](./connection-configuration.md)
144
145
### Fault Tolerance & Write-Ahead Logging
146
147
Exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications.
148
149
```java { .api }
150
public class CassandraCommitter extends CheckpointCommitter {
151
public CassandraCommitter(ClusterBuilder builder);
152
public CassandraCommitter(ClusterBuilder builder, String keySpace);
153
}
154
```
155
156
[Fault Tolerance](./fault-tolerance.md)
157
158
## Types
159
160
### Core Types
161
162
```java { .api }
163
// Main sink wrapper class
164
public class CassandraSink<IN> {
165
// Sink configuration and lifecycle methods
166
public CassandraSink<IN> name(String name);
167
public CassandraSink<IN> uid(String uid);
168
public CassandraSink<IN> setUidHash(String uidHash);
169
public CassandraSink<IN> setParallelism(int parallelism);
170
public CassandraSink<IN> disableChaining();
171
public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);
172
}
173
174
// Tuple-specific sink builder
175
public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {
176
// Specialized for tuple-based data with CQL queries
177
}
178
179
// POJO-specific sink builder
180
public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {
181
// Specialized for POJO-based data with DataStax mapping annotations
182
}
183
```