or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-connectors.mdconfiguration.mdindex.mdstreaming-sinks.mdtable-api.mdwrite-ahead-logging.md

streaming-sinks.mddocs/

0

# Streaming Data Sinks

1

2

Comprehensive streaming sink functionality for writing data from Flink DataStreams to Cassandra. The connector supports multiple data types with automatic type detection, builder-based configuration, and robust failure handling mechanisms.

3

4

## Capabilities

5

6

### Main Sink Entry Point

7

8

The primary entry point for creating Cassandra sinks with automatic type detection and builder creation.

9

10

```java { .api }

11

public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

12

public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input);

13

```

14

15

The `addSink` method automatically detects the input data type and returns the appropriate builder:

16

- `TupleTypeInfo``CassandraTupleSinkBuilder`

17

- `RowTypeInfo``CassandraRowSinkBuilder`

18

- `PojoTypeInfo``CassandraPojoSinkBuilder`

19

- `CaseClassTypeInfo``CassandraScalaProductSinkBuilder`

20

21

### Sink Builder Configuration

22

23

Base builder providing common configuration options for all sink types.

24

25

```java { .api }

26

public abstract static class CassandraSinkBuilder<IN> {

27

public CassandraSinkBuilder<IN> setQuery(String query);

28

public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace);

29

public CassandraSinkBuilder<IN> setHost(String host);

30

public CassandraSinkBuilder<IN> setHost(String host, int port);

31

public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);

32

public CassandraSinkBuilder<IN> enableWriteAheadLog();

33

public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);

34

public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options);

35

public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);

36

public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);

37

public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout);

38

public CassandraSinkBuilder<IN> enableIgnoreNullFields();

39

public CassandraSink<IN> build();

40

}

41

```

42

43

**Usage Examples:**

44

45

```java

46

// Basic configuration with host

47

CassandraSink.addSink(stream)

48

.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")

49

.setHost("127.0.0.1", 9042)

50

.build();

51

52

// Advanced configuration with custom cluster builder

53

ClusterBuilder builder = new ClusterBuilder() {

54

@Override

55

protected Cluster buildCluster(Cluster.Builder builder) {

56

return builder

57

.addContactPoint("127.0.0.1")

58

.addContactPoint("127.0.0.2")

59

.withPort(9042)

60

.withCredentials("username", "password")

61

.build();

62

}

63

};

64

65

CassandraSink.addSink(stream)

66

.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")

67

.setClusterBuilder(builder)

68

.setMaxConcurrentRequests(100)

69

.enableIgnoreNullFields()

70

.build();

71

```

72

73

### Tuple Sink Builder

74

75

Specialized builder for Flink Tuple types with CQL query requirements.

76

77

```java { .api }

78

public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {

79

// Inherits all methods from CassandraSinkBuilder

80

// Requires setQuery() to be called

81

// Does not support setDefaultKeyspace()

82

}

83

```

84

85

**Usage Example:**

86

87

```java

88

DataStream<Tuple3<String, Integer, Boolean>> tupleStream = // ... your stream

89

90

CassandraSink.addSink(tupleStream)

91

.setQuery("INSERT INTO example.metrics (name, value, active) VALUES (?, ?, ?);")

92

.setHost("127.0.0.1")

93

.build();

94

```

95

96

### Row Sink Builder

97

98

Specialized builder for Flink Row types with schema-based field mapping.

99

100

```java { .api }

101

public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> {

102

// Inherits all methods from CassandraSinkBuilder

103

// Requires setQuery() to be called

104

// Does not support setDefaultKeyspace()

105

}

106

```

107

108

**Usage Example:**

109

110

```java

111

DataStream<Row> rowStream = // ... your row stream

112

113

CassandraSink.addSink(rowStream)

114

.setQuery("INSERT INTO example.users (id, name, age) VALUES (?, ?, ?);")

115

.setHost("127.0.0.1")

116

.build();

117

```

118

119

### POJO Sink Builder

120

121

Specialized builder for Plain Old Java Objects using DataStax object mapping.

122

123

```java { .api }

124

public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {

125

// Inherits all methods from CassandraSinkBuilder

126

// Does not support setQuery() - uses DataStax annotations

127

// Supports setDefaultKeyspace() and setMapperOptions()

128

}

129

```

130

131

**Usage Example:**

132

133

```java

134

// Define POJO with Cassandra annotations

135

@Table(keyspace = "example", name = "products")

136

public class Product {

137

@PartitionKey

138

private String id;

139

140

@Column(name = "name")

141

private String name;

142

143

@Column(name = "price")

144

private BigDecimal price;

145

146

// constructors, getters, setters...

147

}

148

149

// Configure sink with mapper options

150

MapperOptions options = new MapperOptions() {

151

@Override

152

public Mapper.Option[] getMapperOptions() {

153

return new Mapper.Option[] {

154

Mapper.Option.saveNullFields(false),

155

Mapper.Option.timestamp(System.currentTimeMillis())

156

};

157

}

158

};

159

160

DataStream<Product> productStream = // ... your product stream

161

162

CassandraSink.addSink(productStream)

163

.setDefaultKeyspace("example")

164

.setMapperOptions(options)

165

.setHost("127.0.0.1")

166

.build();

167

```

168

169

### Scala Product Sink Builder

170

171

Specialized builder for Scala case classes and tuples.

172

173

```java { .api }

174

public static class CassandraScalaProductSinkBuilder<IN extends Product> extends CassandraSinkBuilder<IN> {

175

// Inherits all methods from CassandraSinkBuilder

176

// Requires setQuery() to be called

177

// Does not support setDefaultKeyspace()

178

}

179

```

180

181

### Sink Operations and Control

182

183

The resulting `CassandraSink` provides Flink operator configuration methods.

184

185

```java { .api }

186

public class CassandraSink<IN> {

187

public CassandraSink<IN> name(String name);

188

public CassandraSink<IN> uid(String uid);

189

public CassandraSink<IN> setUidHash(String uidHash);

190

public CassandraSink<IN> setParallelism(int parallelism);

191

public CassandraSink<IN> disableChaining();

192

public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);

193

}

194

```

195

196

**Usage Example:**

197

198

```java

199

CassandraSink.addSink(stream)

200

.setQuery("INSERT INTO example.events (id, timestamp, data) VALUES (?, ?, ?);")

201

.setHost("127.0.0.1")

202

.build()

203

.name("Cassandra Event Sink")

204

.uid("cassandra-sink-1")

205

.setParallelism(4)

206

.slotSharingGroup("cassandra-sinks");

207

```

208

209

## Core Sink Implementations

210

211

### Base Sink Functionality

212

213

All streaming sinks extend from the common base class providing core functionality.

214

215

```java { .api }

216

public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {

217

public void open(Configuration configuration);

218

public void close();

219

public void initializeState(FunctionInitializationContext context);

220

public void snapshotState(FunctionSnapshotContext ctx);

221

public void invoke(IN value);

222

public abstract ListenableFuture<V> send(IN value);

223

}

224

```

225

226

### Tuple-Based Sinks

227

228

Base class for sinks that work with tuple-like data structures.

229

230

```java { .api }

231

public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> {

232

public AbstractCassandraTupleSink(String insertQuery, ClusterBuilder builder, CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler);

233

public void open(Configuration configuration);

234

public ListenableFuture<ResultSet> send(IN value);

235

protected abstract Object[] extract(IN record);

236

}

237

```

238

239

### Specific Sink Types

240

241

Individual sink implementations for different data types.

242

243

```java { .api }

244

// Flink Tuple sink

245

public class CassandraTupleSink<IN extends Tuple> extends AbstractCassandraTupleSink<IN> {

246

public CassandraTupleSink(String insertQuery, ClusterBuilder builder);

247

protected Object[] extract(IN record);

248

}

249

250

// Flink Row sink

251

public class CassandraRowSink extends AbstractCassandraTupleSink<Row> {

252

public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder);

253

protected Object[] extract(Row record);

254

}

255

256

// POJO sink with DataStax mapping

257

public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {

258

public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder);

259

public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, MapperOptions options);

260

public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, String keyspace);

261

public void open(Configuration configuration);

262

public ListenableFuture<ResultSet> send(IN value);

263

}

264

265

// Scala Product sink

266

public class CassandraScalaProductSink<IN extends Product> extends AbstractCassandraTupleSink<IN> {

267

public CassandraScalaProductSink(String insertQuery, ClusterBuilder builder);

268

protected Object[] extract(IN record);

269

}

270

```

271

272

## Error Handling

273

274

All sinks support custom failure handlers for robust error handling:

275

276

```java

277

// Custom failure handler example

278

CassandraFailureHandler customHandler = new CassandraFailureHandler() {

279

@Override

280

public void onFailure(Throwable failure) throws IOException {

281

if (failure instanceof WriteTimeoutException) {

282

// Log timeout but continue processing

283

logger.warn("Write timeout, continuing", failure);

284

return;

285

}

286

// Re-throw other exceptions to fail the sink

287

throw new IOException("Cassandra write failed", failure);

288

}

289

};

290

291

CassandraSink.addSink(stream)

292

.setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")

293

.setHost("127.0.0.1")

294

.setFailureHandler(customHandler)

295

.build();

296

```

297

298

## Performance Tuning

299

300

Configure concurrency and resource management:

301

302

```java

303

CassandraSink.addSink(stream)

304

.setQuery("INSERT INTO example.data (id, value) VALUES (?, ?);")

305

.setHost("127.0.0.1")

306

.setMaxConcurrentRequests(100) // Limit concurrent requests

307

.setMaxConcurrentRequests(100, Duration.ofSeconds(30)) // With timeout

308

.enableIgnoreNullFields() // Avoid tombstones

309

.build();

310

```