0
# Batch Data Processing
1
2
Comprehensive batch input and output formats for reading from and writing to Cassandra in Flink batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.
3
4
## Capabilities
5
6
### Input Formats
7
8
#### Base Input Format
9
10
Common base class for all Cassandra input formats providing connection management and split handling.
11
12
```java { .api }
13
public abstract class CassandraInputFormatBase<OUT> extends RichInputFormat<OUT, InputSplit> {
14
public CassandraInputFormatBase(String query, ClusterBuilder builder);
15
public void configure(Configuration parameters);
16
public BaseStatistics getStatistics(BaseStatistics cachedStatistics);
17
public InputSplit[] createInputSplits(int minNumSplits);
18
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
19
public void close();
20
}
21
```
22
23
#### Tuple Input Format
24
25
Reads data from Cassandra and generates Flink Tuples.
26
27
```java { .api }
28
public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT> {
29
public CassandraInputFormat(String query, ClusterBuilder builder);
30
public void open(InputSplit ignored);
31
public boolean reachedEnd();
32
public OUT nextRecord(OUT reuse);
33
}
34
```
35
36
**Usage Example:**
37
38
```java
39
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
40
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
41
import org.apache.flink.api.java.tuple.Tuple3;
42
import org.apache.flink.api.java.DataSet;
43
import org.apache.flink.api.java.ExecutionEnvironment;
44
import com.datastax.driver.core.Cluster;
45
46
// Create cluster builder
47
ClusterBuilder builder = new ClusterBuilder() {
48
@Override
49
protected Cluster buildCluster(Cluster.Builder builder) {
50
return builder.addContactPoint("127.0.0.1").build();
51
}
52
};
53
54
// Create input format
55
CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat =
56
new CassandraInputFormat<>(
57
"SELECT word, count, description FROM example.words WHERE token(word) > ? AND token(word) <= ?",
58
builder
59
);
60
61
// Use in batch job
62
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
63
DataSet<Tuple3<String, Integer, String>> dataSet = env.createInput(inputFormat);
64
```
65
66
#### POJO Input Format
67
68
Reads data from Cassandra and generates POJOs using DataStax object mapping.
69
70
```java { .api }
71
public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT> {
72
public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass);
73
public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass, MapperOptions mapperOptions);
74
public void open(InputSplit split);
75
public boolean reachedEnd();
76
public OUT nextRecord(OUT reuse);
77
}
78
```
79
80
**Usage Example:**
81
82
```java
83
// Define POJO with Cassandra annotations
84
@Table(keyspace = "example", name = "users")
85
public class User {
86
@PartitionKey
87
private String id;
88
89
@Column(name = "name")
90
private String name;
91
92
@Column(name = "age")
93
private Integer age;
94
95
// constructors, getters, setters...
96
}
97
98
// Create POJO input format
99
CassandraPojoInputFormat<User> pojoInputFormat =
100
new CassandraPojoInputFormat<>(
101
"SELECT * FROM example.users WHERE age > ?",
102
builder,
103
User.class
104
);
105
106
// Use with mapper options
107
MapperOptions options = new MapperOptions() {
108
@Override
109
public Mapper.Option[] getMapperOptions() {
110
return new Mapper.Option[] {
111
Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
112
};
113
}
114
};
115
116
CassandraPojoInputFormat<User> pojoWithOptions =
117
new CassandraPojoInputFormat<>(
118
"SELECT * FROM example.users",
119
builder,
120
User.class,
121
options
122
);
123
124
DataSet<User> users = env.createInput(pojoWithOptions);
125
```
126
127
### Output Formats
128
129
#### Base Output Format
130
131
Common base class for all Cassandra output formats providing connection management and batch writing.
132
133
```java { .api }
134
public abstract class CassandraOutputFormatBase<OUT> extends RichOutputFormat<OUT> {
135
public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder);
136
public void configure(Configuration parameters);
137
public void open(int taskNumber, int numTasks);
138
public void writeRecord(OUT record);
139
public void close();
140
protected abstract Object[] extractFields(OUT record);
141
protected void onWriteSuccess(ResultSet ignored);
142
protected void onWriteFailure(Throwable t);
143
}
144
```
145
146
#### Tuple Output Format
147
148
Writes Flink Tuples to Cassandra using prepared statements.
149
150
```java { .api }
151
public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT> {
152
public CassandraTupleOutputFormat(String insertQuery, ClusterBuilder builder);
153
protected Object[] extractFields(OUT record);
154
}
155
```
156
157
**Usage Example:**
158
159
```java
160
import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
161
162
// Create output format
163
CassandraTupleOutputFormat<Tuple3<String, Integer, String>> outputFormat =
164
new CassandraTupleOutputFormat<>(
165
"INSERT INTO example.words (word, count, description) VALUES (?, ?, ?)",
166
builder
167
);
168
169
// Use in batch job
170
DataSet<Tuple3<String, Integer, String>> results = // ... your data processing
171
results.output(outputFormat);
172
```
173
174
#### Row Output Format
175
176
Writes Flink Rows to Cassandra with schema-based field extraction.
177
178
```java { .api }
179
public class CassandraRowOutputFormat extends CassandraOutputFormatBase<Row> {
180
public CassandraRowOutputFormat(String insertQuery, ClusterBuilder builder);
181
protected Object[] extractFields(Row record);
182
}
183
```
184
185
**Usage Example:**
186
187
```java
188
import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;
189
import org.apache.flink.types.Row;
190
191
// Create row output format
192
CassandraRowOutputFormat rowOutputFormat =
193
new CassandraRowOutputFormat(
194
"INSERT INTO example.metrics (id, timestamp, value) VALUES (?, ?, ?)",
195
builder
196
);
197
198
DataSet<Row> metrics = // ... your row data
199
metrics.output(rowOutputFormat);
200
```
201
202
#### POJO Output Format
203
204
Writes POJOs to Cassandra using DataStax object mapping.
205
206
```java { .api }
207
public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT> {
208
public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass);
209
public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass, MapperOptions mapperOptions);
210
public void configure(Configuration parameters);
211
public void open(int taskNumber, int numTasks);
212
public void writeRecord(OUT record);
213
public void close();
214
}
215
```
216
217
**Usage Example:**
218
219
```java
220
import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
221
222
// Create POJO output format
223
CassandraPojoOutputFormat<User> pojoOutputFormat =
224
new CassandraPojoOutputFormat<>(builder, User.class);
225
226
// With mapper options
227
MapperOptions writeOptions = new MapperOptions() {
228
@Override
229
public Mapper.Option[] getMapperOptions() {
230
return new Mapper.Option[] {
231
Mapper.Option.ttl(3600), // 1 hour TTL
232
Mapper.Option.timestamp(System.currentTimeMillis())
233
};
234
}
235
};
236
237
CassandraPojoOutputFormat<User> pojoWithOptions =
238
new CassandraPojoOutputFormat<>(builder, User.class, writeOptions);
239
240
DataSet<User> processedUsers = // ... your user processing
241
processedUsers.output(pojoWithOptions);
242
```
243
244
## Advanced Usage Patterns
245
246
### Parallel Processing with Input Splits
247
248
The input formats automatically handle parallelism by creating input splits:
249
250
```java
251
// Configure parallelism
252
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
253
env.setParallelism(4); // Use 4 parallel tasks
254
255
// Input format will create splits automatically
256
CassandraInputFormat<Tuple2<String, Integer>> inputFormat =
257
new CassandraInputFormat<>(
258
"SELECT id, value FROM example.data WHERE token(id) > ? AND token(id) <= ?",
259
builder
260
);
261
262
DataSet<Tuple2<String, Integer>> parallelData = env.createInput(inputFormat);
263
```
264
265
### Custom Connection Configuration
266
267
Use advanced cluster configuration for production deployments:
268
269
```java
270
ClusterBuilder productionBuilder = new ClusterBuilder() {
271
@Override
272
protected Cluster buildCluster(Cluster.Builder builder) {
273
return builder
274
.addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")
275
.withPort(9042)
276
.withCredentials("username", "password")
277
.withSocketOptions(new SocketOptions()
278
.setConnectTimeoutMillis(10000)
279
.setReadTimeoutMillis(10000))
280
.withRetryPolicy(DefaultRetryPolicy.INSTANCE)
281
.withReconnectionPolicy(new ConstantReconnectionPolicy(1000))
282
.build();
283
}
284
};
285
```
286
287
### Error Handling in Batch Jobs
288
289
Override callback methods for custom error handling:
290
291
```java
292
CassandraTupleOutputFormat<Tuple2<String, Integer>> customOutputFormat =
293
new CassandraTupleOutputFormat<Tuple2<String, Integer>>(
294
"INSERT INTO example.data (id, value) VALUES (?, ?)",
295
builder
296
) {
297
@Override
298
protected void onWriteFailure(Throwable t) {
299
// Log error and continue, or re-throw to fail the job
300
logger.error("Failed to write record", t);
301
// super.onWriteFailure(t); // Uncomment to fail on error
302
}
303
304
@Override
305
protected void onWriteSuccess(ResultSet result) {
306
// Custom success handling
307
logger.debug("Successfully wrote record");
308
}
309
};
310
```
311
312
### Memory Management
313
314
For large datasets, consider memory-efficient processing:
315
316
```java
317
// Configure batch size and resource management
318
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
319
env.getConfig().enableObjectReuse(); // Reuse objects to reduce GC pressure
320
321
// Process data in smaller batches
322
DataSet<User> largeDataset = env.createInput(inputFormat);
323
largeDataset
324
.rebalance() // Distribute data evenly
325
.output(outputFormat);
326
```
327
328
## Deprecated Components
329
330
### CassandraOutputFormat (Deprecated)
331
332
```java { .api }
333
@Deprecated
334
public class CassandraOutputFormat<OUT extends Tuple> extends CassandraTupleOutputFormat<OUT> {
335
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
336
}
337
```
338
339
**Note:** Use `CassandraTupleOutputFormat` instead of the deprecated `CassandraOutputFormat`.