or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconnection-configuration.mdfault-tolerance.mdindex.mdstreaming-sinks.md

batch-processing.mddocs/

0

# Batch Data Processing

1

2

Input and output formats for batch processing jobs using Apache Flink's DataSet API. These implementations provide efficient reading from and writing to Cassandra databases in batch processing scenarios.

3

4

## Capabilities

5

6

### CassandraInputFormat

7

8

Input format for reading tuple data from Cassandra in batch processing jobs.

9

10

```java { .api }

11

/**

12

* InputFormat for reading tuple data from Cassandra using CQL queries

13

* Implements NonParallelInput - runs as single parallel instance

14

*/

15

public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {

16

/**

17

* Creates input format with CQL SELECT query and cluster configuration

18

* @param query CQL SELECT statement for data retrieval

19

* @param builder ClusterBuilder for connection configuration

20

* @throws IllegalArgumentException if query is null/empty or builder is null

21

*/

22

public CassandraInputFormat(String query, ClusterBuilder builder);

23

24

/**

25

* Configures cluster connection (called by Flink framework)

26

* @param parameters configuration parameters

27

*/

28

@Override

29

public void configure(Configuration parameters);

30

31

/**

32

* Returns cached statistics for query optimization

33

* @param cachedStatistics previously cached statistics

34

* @return cached statistics (no new statistics computed)

35

*/

36

@Override

37

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;

38

39

/**

40

* Opens session and executes query to initialize result set

41

* @param ignored input split (ignored for non-parallel input)

42

* @throws IOException if connection or query execution fails

43

*/

44

@Override

45

public void open(InputSplit ignored) throws IOException;

46

47

/**

48

* Checks if all results have been consumed

49

* @return true if result set is exhausted

50

*/

51

@Override

52

public boolean reachedEnd() throws IOException;

53

54

/**

55

* Returns next record from result set, populating reusable tuple

56

* @param reuse tuple instance to populate with data

57

* @return populated tuple with next record data

58

* @throws IOException if record retrieval fails

59

*/

60

@Override

61

public OUT nextRecord(OUT reuse) throws IOException;

62

63

/**

64

* Creates single input split (non-parallel processing)

65

* @param minNumSplits minimum splits requested (ignored)

66

* @return array with single GenericInputSplit

67

*/

68

@Override

69

public InputSplit[] createInputSplits(int minNumSplits) throws IOException;

70

71

/**

72

* Returns default input split assigner

73

* @param inputSplits array of input splits

74

* @return DefaultInputSplitAssigner for sequential processing

75

*/

76

@Override

77

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);

78

79

/**

80

* Closes session and cluster connections

81

* @throws IOException if connection cleanup fails

82

*/

83

@Override

84

public void close() throws IOException;

85

}

86

```

87

88

**Usage Examples:**

89

90

```java

91

import org.apache.flink.api.java.DataSet;

92

import org.apache.flink.api.java.ExecutionEnvironment;

93

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;

94

import org.apache.flink.api.java.tuple.Tuple3;

95

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

96

import com.datastax.driver.core.Cluster;

97

98

// Create cluster builder

99

ClusterBuilder builder = new ClusterBuilder() {

100

@Override

101

protected Cluster buildCluster(Cluster.Builder builder) {

102

return builder

103

.addContactPoint("cassandra.example.com")

104

.withPort(9042)

105

.build();

106

}

107

};

108

109

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

110

111

// Read user data from Cassandra

112

CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat =

113

new CassandraInputFormat<>(

114

"SELECT name, age, email FROM users WHERE age > 18",

115

builder

116

);

117

118

DataSet<Tuple3<String, Integer, String>> users = env.createInput(inputFormat);

119

120

// Process the data

121

DataSet<Tuple3<String, Integer, String>> processedUsers = users

122

.filter(user -> user.f1 < 65) // age < 65

123

.map(user -> new Tuple3<>(user.f0.toUpperCase(), user.f1, user.f2));

124

125

processedUsers.print();

126

```

127

128

### CassandraOutputFormat

129

130

Output format for writing tuple data to Cassandra in batch processing jobs.

131

132

```java { .api }

133

/**

134

* OutputFormat for writing tuple data to Cassandra using CQL INSERT statements

135

*/

136

public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {

137

/**

138

* Creates output format with CQL INSERT query and cluster configuration

139

* @param insertQuery CQL INSERT statement with parameter placeholders

140

* @param builder ClusterBuilder for connection configuration

141

* @throws IllegalArgumentException if insertQuery is null/empty or builder is null

142

*/

143

public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);

144

145

/**

146

* Configures cluster connection (called by Flink framework)

147

* @param parameters configuration parameters

148

*/

149

@Override

150

public void configure(Configuration parameters);

151

152

/**

153

* Opens session and prepares INSERT statement

154

* @param taskNumber parallel instance number

155

* @param numTasks total number of parallel instances

156

* @throws IOException if connection setup or statement preparation fails

157

*/

158

@Override

159

public void open(int taskNumber, int numTasks) throws IOException;

160

161

/**

162

* Writes single record to Cassandra using prepared statement

163

* @param record tuple record to write

164

* @throws IOException if write operation fails

165

*/

166

@Override

167

public void writeRecord(OUT record) throws IOException;

168

169

/**

170

* Closes session and cluster connections

171

* @throws IOException if connection cleanup fails

172

*/

173

@Override

174

public void close() throws IOException;

175

}

176

```

177

178

**Usage Examples:**

179

180

```java

181

import org.apache.flink.api.java.DataSet;

182

import org.apache.flink.api.java.ExecutionEnvironment;

183

import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;

184

import org.apache.flink.api.java.tuple.Tuple4;

185

186

// Process and transform data

187

DataSet<Tuple4<String, Integer, Double, String>> processedOrders = // your processed data

188

189

// Write results to Cassandra

190

CassandraOutputFormat<Tuple4<String, Integer, Double, String>> outputFormat =

191

new CassandraOutputFormat<>(

192

"INSERT INTO analytics.order_summary (order_id, item_count, total_value, status) VALUES (?, ?, ?, ?)",

193

builder

194

);

195

196

processedOrders.output(outputFormat);

197

198

env.execute("Batch Processing Job");

199

```

200

201

### Batch Processing Patterns

202

203

#### ETL Pipeline Example

204

205

```java

206

// Complete ETL pipeline using batch processing

207

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

208

209

// Extract: Read raw data from Cassandra

210

CassandraInputFormat<Tuple3<String, String, Long>> rawDataFormat =

211

new CassandraInputFormat<>(

212

"SELECT user_id, event_type, timestamp FROM events WHERE date = '2023-01-01'",

213

sourceBuilder

214

);

215

DataSet<Tuple3<String, String, Long>> rawEvents = env.createInput(rawDataFormat);

216

217

// Transform: Aggregate and clean data

218

DataSet<Tuple3<String, Integer, Double>> aggregatedData = rawEvents

219

.groupBy(0) // group by user_id

220

.reduceGroup(new RichGroupReduceFunction<Tuple3<String, String, Long>, Tuple3<String, Integer, Double>>() {

221

@Override

222

public void reduce(Iterable<Tuple3<String, String, Long>> events,

223

Collector<Tuple3<String, Integer, Double>> out) throws Exception {

224

String userId = null;

225

int eventCount = 0;

226

double avgTimestamp = 0.0;

227

long totalTimestamp = 0;

228

229

for (Tuple3<String, String, Long> event : events) {

230

userId = event.f0;

231

eventCount++;

232

totalTimestamp += event.f2;

233

}

234

235

if (eventCount > 0) {

236

avgTimestamp = (double) totalTimestamp / eventCount;

237

out.collect(new Tuple3<>(userId, eventCount, avgTimestamp));

238

}

239

}

240

});

241

242

// Load: Write aggregated results to Cassandra

243

CassandraOutputFormat<Tuple3<String, Integer, Double>> targetFormat =

244

new CassandraOutputFormat<>(

245

"INSERT INTO analytics.user_daily_stats (user_id, event_count, avg_timestamp) VALUES (?, ?, ?)",

246

targetBuilder

247

);

248

249

aggregatedData.output(targetFormat);

250

env.execute("Daily Analytics ETL");

251

```

252

253

#### Data Migration Example

254

255

```java

256

// Migrate data between different Cassandra clusters or keyspaces

257

CassandraInputFormat<Tuple5<String, String, Integer, Boolean, Long>> sourceFormat =

258

new CassandraInputFormat<>(

259

"SELECT id, name, age, active, created_at FROM legacy.users",

260

sourceClusterBuilder

261

);

262

263

CassandraOutputFormat<Tuple5<String, String, Integer, Boolean, Long>> targetFormat =

264

new CassandraOutputFormat<>(

265

"INSERT INTO new_schema.user_profiles (user_id, full_name, age, is_active, registration_date) VALUES (?, ?, ?, ?, ?)",

266

targetClusterBuilder

267

);

268

269

DataSet<Tuple5<String, String, Integer, Boolean, Long>> userData = env.createInput(sourceFormat);

270

271

// Apply any transformations needed during migration

272

DataSet<Tuple5<String, String, Integer, Boolean, Long>> migratedData = userData

273

.filter(user -> user.f3) // only active users

274

.map(user -> {

275

// Transform data format if needed

276

return new Tuple5<>(

277

"user_" + user.f0, // prefix user ID

278

user.f1.trim(), // clean name

279

user.f2, // age unchanged

280

user.f3, // active status

281

user.f4 // timestamp unchanged

282

);

283

});

284

285

migratedData.output(targetFormat);

286

env.execute("Data Migration Job");

287

```

288

289

## Configuration Notes

290

291

### Performance Considerations

292

293

- **CassandraInputFormat**: Runs as non-parallel input by design to avoid overwhelming Cassandra with concurrent queries. For large datasets, consider partitioning queries by date ranges or other criteria.

294

- **CassandraOutputFormat**: Supports parallelism and can write concurrently to Cassandra. Monitor cluster performance and adjust parallelism accordingly.

295

- **Query Optimization**: Use appropriate WHERE clauses and LIMIT statements in input queries to control data volume.

296

297

### Error Handling

298

299

Both input and output formats provide synchronous error handling:

300

301

- **Connection Errors**: Throw `IOException` during `open()` if cluster connection fails

302

- **Query Errors**: Input format throws `IOException` during `open()` if SELECT query is invalid

303

- **Write Errors**: Output format throws `IOException` during `writeRecord()` if INSERT fails

304

- **Schema Errors**: Runtime errors if tuple arity doesn't match query parameter count

305

306

### Resource Management

307

308

Both formats properly manage Cassandra connections:

309

- Open connections during `open()` method

310

- Close sessions and clusters during `close()` method

311

- Handle exceptions during cleanup to prevent resource leaks