0
# Batch Data Processing
1
2
Input and output formats for batch processing jobs using Apache Flink's DataSet API. These implementations provide efficient reading from and writing to Cassandra databases in batch processing scenarios.
3
4
## Capabilities
5
6
### CassandraInputFormat
7
8
Input format for reading tuple data from Cassandra in batch processing jobs.
9
10
```java { .api }
11
/**
12
* InputFormat for reading tuple data from Cassandra using CQL queries
13
* Implements NonParallelInput - runs as single parallel instance
14
*/
15
public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
16
/**
17
* Creates input format with CQL SELECT query and cluster configuration
18
* @param query CQL SELECT statement for data retrieval
19
* @param builder ClusterBuilder for connection configuration
20
* @throws IllegalArgumentException if query is null/empty or builder is null
21
*/
22
public CassandraInputFormat(String query, ClusterBuilder builder);
23
24
/**
25
* Configures cluster connection (called by Flink framework)
26
* @param parameters configuration parameters
27
*/
28
@Override
29
public void configure(Configuration parameters);
30
31
/**
32
* Returns cached statistics for query optimization
33
* @param cachedStatistics previously cached statistics
34
* @return cached statistics (no new statistics computed)
35
*/
36
@Override
37
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
38
39
/**
40
* Opens session and executes query to initialize result set
41
* @param ignored input split (ignored for non-parallel input)
42
* @throws IOException if connection or query execution fails
43
*/
44
@Override
45
public void open(InputSplit ignored) throws IOException;
46
47
/**
48
* Checks if all results have been consumed
49
* @return true if result set is exhausted
50
*/
51
@Override
52
public boolean reachedEnd() throws IOException;
53
54
/**
55
* Returns next record from result set, populating reusable tuple
56
* @param reuse tuple instance to populate with data
57
* @return populated tuple with next record data
58
* @throws IOException if record retrieval fails
59
*/
60
@Override
61
public OUT nextRecord(OUT reuse) throws IOException;
62
63
/**
64
* Creates single input split (non-parallel processing)
65
* @param minNumSplits minimum splits requested (ignored)
66
* @return array with single GenericInputSplit
67
*/
68
@Override
69
public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
70
71
/**
72
* Returns default input split assigner
73
* @param inputSplits array of input splits
74
* @return DefaultInputSplitAssigner for sequential processing
75
*/
76
@Override
77
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
78
79
/**
80
* Closes session and cluster connections
81
* @throws IOException if connection cleanup fails
82
*/
83
@Override
84
public void close() throws IOException;
85
}
86
```
87
88
**Usage Examples:**
89
90
```java
91
import org.apache.flink.api.java.DataSet;
92
import org.apache.flink.api.java.ExecutionEnvironment;
93
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;
94
import org.apache.flink.api.java.tuple.Tuple3;
95
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
96
import com.datastax.driver.core.Cluster;
97
98
// Create cluster builder
99
ClusterBuilder builder = new ClusterBuilder() {
100
@Override
101
protected Cluster buildCluster(Cluster.Builder builder) {
102
return builder
103
.addContactPoint("cassandra.example.com")
104
.withPort(9042)
105
.build();
106
}
107
};
108
109
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
110
111
// Read user data from Cassandra
112
CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat =
113
new CassandraInputFormat<>(
114
"SELECT name, age, email FROM users WHERE age > 18",
115
builder
116
);
117
118
DataSet<Tuple3<String, Integer, String>> users = env.createInput(inputFormat);
119
120
// Process the data
121
DataSet<Tuple3<String, Integer, String>> processedUsers = users
122
.filter(user -> user.f1 < 65) // age < 65
123
.map(user -> new Tuple3<>(user.f0.toUpperCase(), user.f1, user.f2));
124
125
processedUsers.print();
126
```
127
128
### CassandraOutputFormat
129
130
Output format for writing tuple data to Cassandra in batch processing jobs.
131
132
```java { .api }
133
/**
134
* OutputFormat for writing tuple data to Cassandra using CQL INSERT statements
135
*/
136
public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
137
/**
138
* Creates output format with CQL INSERT query and cluster configuration
139
* @param insertQuery CQL INSERT statement with parameter placeholders
140
* @param builder ClusterBuilder for connection configuration
141
* @throws IllegalArgumentException if insertQuery is null/empty or builder is null
142
*/
143
public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);
144
145
/**
146
* Configures cluster connection (called by Flink framework)
147
* @param parameters configuration parameters
148
*/
149
@Override
150
public void configure(Configuration parameters);
151
152
/**
153
* Opens session and prepares INSERT statement
154
* @param taskNumber parallel instance number
155
* @param numTasks total number of parallel instances
156
* @throws IOException if connection setup or statement preparation fails
157
*/
158
@Override
159
public void open(int taskNumber, int numTasks) throws IOException;
160
161
/**
162
* Writes single record to Cassandra using prepared statement
163
* @param record tuple record to write
164
* @throws IOException if write operation fails
165
*/
166
@Override
167
public void writeRecord(OUT record) throws IOException;
168
169
/**
170
* Closes session and cluster connections
171
* @throws IOException if connection cleanup fails
172
*/
173
@Override
174
public void close() throws IOException;
175
}
176
```
177
178
**Usage Examples:**
179
180
```java
181
import org.apache.flink.api.java.DataSet;
182
import org.apache.flink.api.java.ExecutionEnvironment;
183
import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;
184
import org.apache.flink.api.java.tuple.Tuple4;
185
186
// Process and transform data
187
DataSet<Tuple4<String, Integer, Double, String>> processedOrders = // your processed data
188
189
// Write results to Cassandra
190
CassandraOutputFormat<Tuple4<String, Integer, Double, String>> outputFormat =
191
new CassandraOutputFormat<>(
192
"INSERT INTO analytics.order_summary (order_id, item_count, total_value, status) VALUES (?, ?, ?, ?)",
193
builder
194
);
195
196
processedOrders.output(outputFormat);
197
198
env.execute("Batch Processing Job");
199
```
200
201
### Batch Processing Patterns
202
203
#### ETL Pipeline Example
204
205
```java
206
// Complete ETL pipeline using batch processing
207
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
208
209
// Extract: Read raw data from Cassandra
210
CassandraInputFormat<Tuple3<String, String, Long>> rawDataFormat =
211
new CassandraInputFormat<>(
212
"SELECT user_id, event_type, timestamp FROM events WHERE date = '2023-01-01'",
213
sourceBuilder
214
);
215
DataSet<Tuple3<String, String, Long>> rawEvents = env.createInput(rawDataFormat);
216
217
// Transform: Aggregate and clean data
218
DataSet<Tuple3<String, Integer, Double>> aggregatedData = rawEvents
219
.groupBy(0) // group by user_id
220
.reduceGroup(new RichGroupReduceFunction<Tuple3<String, String, Long>, Tuple3<String, Integer, Double>>() {
221
@Override
222
public void reduce(Iterable<Tuple3<String, String, Long>> events,
223
Collector<Tuple3<String, Integer, Double>> out) throws Exception {
224
String userId = null;
225
int eventCount = 0;
226
double avgTimestamp = 0.0;
227
long totalTimestamp = 0;
228
229
for (Tuple3<String, String, Long> event : events) {
230
userId = event.f0;
231
eventCount++;
232
totalTimestamp += event.f2;
233
}
234
235
if (eventCount > 0) {
236
avgTimestamp = (double) totalTimestamp / eventCount;
237
out.collect(new Tuple3<>(userId, eventCount, avgTimestamp));
238
}
239
}
240
});
241
242
// Load: Write aggregated results to Cassandra
243
CassandraOutputFormat<Tuple3<String, Integer, Double>> targetFormat =
244
new CassandraOutputFormat<>(
245
"INSERT INTO analytics.user_daily_stats (user_id, event_count, avg_timestamp) VALUES (?, ?, ?)",
246
targetBuilder
247
);
248
249
aggregatedData.output(targetFormat);
250
env.execute("Daily Analytics ETL");
251
```
252
253
#### Data Migration Example
254
255
```java
256
// Migrate data between different Cassandra clusters or keyspaces
257
CassandraInputFormat<Tuple5<String, String, Integer, Boolean, Long>> sourceFormat =
258
new CassandraInputFormat<>(
259
"SELECT id, name, age, active, created_at FROM legacy.users",
260
sourceClusterBuilder
261
);
262
263
CassandraOutputFormat<Tuple5<String, String, Integer, Boolean, Long>> targetFormat =
264
new CassandraOutputFormat<>(
265
"INSERT INTO new_schema.user_profiles (user_id, full_name, age, is_active, registration_date) VALUES (?, ?, ?, ?, ?)",
266
targetClusterBuilder
267
);
268
269
DataSet<Tuple5<String, String, Integer, Boolean, Long>> userData = env.createInput(sourceFormat);
270
271
// Apply any transformations needed during migration
272
DataSet<Tuple5<String, String, Integer, Boolean, Long>> migratedData = userData
273
.filter(user -> user.f3) // only active users
274
.map(user -> {
275
// Transform data format if needed
276
return new Tuple5<>(
277
"user_" + user.f0, // prefix user ID
278
user.f1.trim(), // clean name
279
user.f2, // age unchanged
280
user.f3, // active status
281
user.f4 // timestamp unchanged
282
);
283
});
284
285
migratedData.output(targetFormat);
286
env.execute("Data Migration Job");
287
```
288
289
## Configuration Notes
290
291
### Performance Considerations
292
293
- **CassandraInputFormat**: Runs as non-parallel input by design to avoid overwhelming Cassandra with concurrent queries. For large datasets, consider partitioning queries by date ranges or other criteria.
294
- **CassandraOutputFormat**: Supports parallelism and can write concurrently to Cassandra. Monitor cluster performance and adjust parallelism accordingly.
295
- **Query Optimization**: Use appropriate WHERE clauses and LIMIT statements in input queries to control data volume.
296
297
### Error Handling
298
299
Both input and output formats provide synchronous error handling:
300
301
- **Connection Errors**: Throw `IOException` during `open()` if cluster connection fails
302
- **Query Errors**: Input format throws `IOException` during `open()` if SELECT query is invalid
303
- **Write Errors**: Output format throws `IOException` during `writeRecord()` if INSERT fails
304
- **Schema Errors**: Runtime errors if tuple arity doesn't match query parameter count
305
306
### Resource Management
307
308
Both formats properly manage Cassandra connections:
309
- Open connections during `open()` method
310
- Close sessions and clusters during `close()` method
311
- Handle exceptions during cleanup to prevent resource leaks