or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-connectors.mdconfiguration.mdindex.mdstreaming-sinks.mdtable-api.mdwrite-ahead-logging.md

batch-connectors.mddocs/

0

# Batch Data Processing

1

2

Comprehensive batch input and output formats for reading from and writing to Cassandra in Flink batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.

3

4

## Capabilities

5

6

### Input Formats

7

8

#### Base Input Format

9

10

Common base class for all Cassandra input formats providing connection management and split handling.

11

12

```java { .api }

13

public abstract class CassandraInputFormatBase<OUT> extends RichInputFormat<OUT, InputSplit> {

14

public CassandraInputFormatBase(String query, ClusterBuilder builder);

15

public void configure(Configuration parameters);

16

public BaseStatistics getStatistics(BaseStatistics cachedStatistics);

17

public InputSplit[] createInputSplits(int minNumSplits);

18

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);

19

public void close();

20

}

21

```

22

23

#### Tuple Input Format

24

25

Reads data from Cassandra and generates Flink Tuples.

26

27

```java { .api }

28

public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT> {

29

public CassandraInputFormat(String query, ClusterBuilder builder);

30

public void open(InputSplit ignored);

31

public boolean reachedEnd();

32

public OUT nextRecord(OUT reuse);

33

}

34

```

35

36

**Usage Example:**

37

38

```java

39

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;

40

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

41

import org.apache.flink.api.java.tuple.Tuple3;

42

import org.apache.flink.api.java.DataSet;

43

import org.apache.flink.api.java.ExecutionEnvironment;

44

import com.datastax.driver.core.Cluster;

45

46

// Create cluster builder

47

ClusterBuilder builder = new ClusterBuilder() {

48

@Override

49

protected Cluster buildCluster(Cluster.Builder builder) {

50

return builder.addContactPoint("127.0.0.1").build();

51

}

52

};

53

54

// Create input format

55

CassandraInputFormat<Tuple3<String, Integer, String>> inputFormat =

56

new CassandraInputFormat<>(

57

"SELECT word, count, description FROM example.words WHERE token(word) > ? AND token(word) <= ?",

58

builder

59

);

60

61

// Use in batch job

62

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

63

DataSet<Tuple3<String, Integer, String>> dataSet = env.createInput(inputFormat);

64

```

65

66

#### POJO Input Format

67

68

Reads data from Cassandra and generates POJOs using DataStax object mapping.

69

70

```java { .api }

71

public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT> {

72

public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass);

73

public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class<OUT> inputClass, MapperOptions mapperOptions);

74

public void open(InputSplit split);

75

public boolean reachedEnd();

76

public OUT nextRecord(OUT reuse);

77

}

78

```

79

80

**Usage Example:**

81

82

```java

83

// Define POJO with Cassandra annotations

84

@Table(keyspace = "example", name = "users")

85

public class User {

86

@PartitionKey

87

private String id;

88

89

@Column(name = "name")

90

private String name;

91

92

@Column(name = "age")

93

private Integer age;

94

95

// constructors, getters, setters...

96

}

97

98

// Create POJO input format

99

CassandraPojoInputFormat<User> pojoInputFormat =

100

new CassandraPojoInputFormat<>(

101

"SELECT * FROM example.users WHERE age > ?",

102

builder,

103

User.class

104

);

105

106

// Use with mapper options

107

MapperOptions options = new MapperOptions() {

108

@Override

109

public Mapper.Option[] getMapperOptions() {

110

return new Mapper.Option[] {

111

Mapper.Option.consistencyLevel(ConsistencyLevel.LOCAL_QUORUM)

112

};

113

}

114

};

115

116

CassandraPojoInputFormat<User> pojoWithOptions =

117

new CassandraPojoInputFormat<>(

118

"SELECT * FROM example.users",

119

builder,

120

User.class,

121

options

122

);

123

124

DataSet<User> users = env.createInput(pojoWithOptions);

125

```

126

127

### Output Formats

128

129

#### Base Output Format

130

131

Common base class for all Cassandra output formats providing connection management and batch writing.

132

133

```java { .api }

134

public abstract class CassandraOutputFormatBase<OUT> extends RichOutputFormat<OUT> {

135

public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder);

136

public void configure(Configuration parameters);

137

public void open(int taskNumber, int numTasks);

138

public void writeRecord(OUT record);

139

public void close();

140

protected abstract Object[] extractFields(OUT record);

141

protected void onWriteSuccess(ResultSet ignored);

142

protected void onWriteFailure(Throwable t);

143

}

144

```

145

146

#### Tuple Output Format

147

148

Writes Flink Tuples to Cassandra using prepared statements.

149

150

```java { .api }

151

public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT> {

152

public CassandraTupleOutputFormat(String insertQuery, ClusterBuilder builder);

153

protected Object[] extractFields(OUT record);

154

}

155

```

156

157

**Usage Example:**

158

159

```java

160

import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;

161

162

// Create output format

163

CassandraTupleOutputFormat<Tuple3<String, Integer, String>> outputFormat =

164

new CassandraTupleOutputFormat<>(

165

"INSERT INTO example.words (word, count, description) VALUES (?, ?, ?)",

166

builder

167

);

168

169

// Use in batch job

170

DataSet<Tuple3<String, Integer, String>> results = // ... your data processing

171

results.output(outputFormat);

172

```

173

174

#### Row Output Format

175

176

Writes Flink Rows to Cassandra with schema-based field extraction.

177

178

```java { .api }

179

public class CassandraRowOutputFormat extends CassandraOutputFormatBase<Row> {

180

public CassandraRowOutputFormat(String insertQuery, ClusterBuilder builder);

181

protected Object[] extractFields(Row record);

182

}

183

```

184

185

**Usage Example:**

186

187

```java

188

import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat;

189

import org.apache.flink.types.Row;

190

191

// Create row output format

192

CassandraRowOutputFormat rowOutputFormat =

193

new CassandraRowOutputFormat(

194

"INSERT INTO example.metrics (id, timestamp, value) VALUES (?, ?, ?)",

195

builder

196

);

197

198

DataSet<Row> metrics = // ... your row data

199

metrics.output(rowOutputFormat);

200

```

201

202

#### POJO Output Format

203

204

Writes POJOs to Cassandra using DataStax object mapping.

205

206

```java { .api }

207

public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT> {

208

public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass);

209

public CassandraPojoOutputFormat(ClusterBuilder builder, Class<OUT> outputClass, MapperOptions mapperOptions);

210

public void configure(Configuration parameters);

211

public void open(int taskNumber, int numTasks);

212

public void writeRecord(OUT record);

213

public void close();

214

}

215

```

216

217

**Usage Example:**

218

219

```java

220

import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;

221

222

// Create POJO output format

223

CassandraPojoOutputFormat<User> pojoOutputFormat =

224

new CassandraPojoOutputFormat<>(builder, User.class);

225

226

// With mapper options

227

MapperOptions writeOptions = new MapperOptions() {

228

@Override

229

public Mapper.Option[] getMapperOptions() {

230

return new Mapper.Option[] {

231

Mapper.Option.ttl(3600), // 1 hour TTL

232

Mapper.Option.timestamp(System.currentTimeMillis())

233

};

234

}

235

};

236

237

CassandraPojoOutputFormat<User> pojoWithOptions =

238

new CassandraPojoOutputFormat<>(builder, User.class, writeOptions);

239

240

DataSet<User> processedUsers = // ... your user processing

241

processedUsers.output(pojoWithOptions);

242

```

243

244

## Advanced Usage Patterns

245

246

### Parallel Processing with Input Splits

247

248

The input formats automatically handle parallelism by creating input splits:

249

250

```java

251

// Configure parallelism

252

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

253

env.setParallelism(4); // Use 4 parallel tasks

254

255

// Input format will create splits automatically

256

CassandraInputFormat<Tuple2<String, Integer>> inputFormat =

257

new CassandraInputFormat<>(

258

"SELECT id, value FROM example.data WHERE token(id) > ? AND token(id) <= ?",

259

builder

260

);

261

262

DataSet<Tuple2<String, Integer>> parallelData = env.createInput(inputFormat);

263

```

264

265

### Custom Connection Configuration

266

267

Use advanced cluster configuration for production deployments:

268

269

```java

270

ClusterBuilder productionBuilder = new ClusterBuilder() {

271

@Override

272

protected Cluster buildCluster(Cluster.Builder builder) {

273

return builder

274

.addContactPoints("cassandra-1", "cassandra-2", "cassandra-3")

275

.withPort(9042)

276

.withCredentials("username", "password")

277

.withSocketOptions(new SocketOptions()

278

.setConnectTimeoutMillis(10000)

279

.setReadTimeoutMillis(10000))

280

.withRetryPolicy(DefaultRetryPolicy.INSTANCE)

281

.withReconnectionPolicy(new ConstantReconnectionPolicy(1000))

282

.build();

283

}

284

};

285

```

286

287

### Error Handling in Batch Jobs

288

289

Override callback methods for custom error handling:

290

291

```java

292

CassandraTupleOutputFormat<Tuple2<String, Integer>> customOutputFormat =

293

new CassandraTupleOutputFormat<Tuple2<String, Integer>>(

294

"INSERT INTO example.data (id, value) VALUES (?, ?)",

295

builder

296

) {

297

@Override

298

protected void onWriteFailure(Throwable t) {

299

// Log error and continue, or re-throw to fail the job

300

logger.error("Failed to write record", t);

301

// super.onWriteFailure(t); // Uncomment to fail on error

302

}

303

304

@Override

305

protected void onWriteSuccess(ResultSet result) {

306

// Custom success handling

307

logger.debug("Successfully wrote record");

308

}

309

};

310

```

311

312

### Memory Management

313

314

For large datasets, consider memory-efficient processing:

315

316

```java

317

// Configure batch size and resource management

318

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

319

env.getConfig().enableObjectReuse(); // Reuse objects to reduce GC pressure

320

321

// Process data in smaller batches

322

DataSet<User> largeDataset = env.createInput(inputFormat);

323

largeDataset

324

.rebalance() // Distribute data evenly

325

.output(outputFormat);

326

```

327

328

## Deprecated Components

329

330

### CassandraOutputFormat (Deprecated)

331

332

```java { .api }

333

@Deprecated

334

public class CassandraOutputFormat<OUT extends Tuple> extends CassandraTupleOutputFormat<OUT> {

335

public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);

336

}

337

```

338

339

**Note:** Use `CassandraTupleOutputFormat` instead of the deprecated `CassandraOutputFormat`.