or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-cassandra

Apache Flink connector for Apache Cassandra - provides sinks for streaming data into Cassandra databases

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-cassandra_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-cassandra@1.14.0

0

# Apache Flink Cassandra Connector

1

2

A comprehensive Apache Flink connector for Apache Cassandra that enables streaming applications to write data efficiently into Cassandra databases. The connector provides multiple sink implementations for different data types, supports both streaming and batch processing modes, offers configurable failure handling mechanisms, and integrates with Flink's checkpointing system for fault tolerance.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-cassandra_2.11

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to your Maven project:

10

```xml

11

<dependency>

12

<groupId>org.apache.flink</groupId>

13

<artifactId>flink-connector-cassandra_2.11</artifactId>

14

<version>1.14.6</version>

15

</dependency>

16

```

17

18

## Core Imports

19

20

For streaming applications:

21

22

```java

23

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

24

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

25

```

26

27

For batch applications:

28

29

```java

30

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;

31

import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;

32

```

33

34

Configuration imports:

35

36

```java

37

import org.apache.flink.streaming.connectors.cassandra.CassandraFailureHandler;

38

import org.apache.flink.streaming.connectors.cassandra.MapperOptions;

39

```

40

41

## Basic Usage

42

43

### Simple Streaming Sink

44

45

```java

46

import org.apache.flink.streaming.api.datastream.DataStream;

47

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

48

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

49

import org.apache.flink.api.java.tuple.Tuple3;

50

import com.datastax.driver.core.Cluster;

51

52

// Create a simple ClusterBuilder

53

ClusterBuilder builder = new ClusterBuilder() {

54

@Override

55

protected Cluster buildCluster(Cluster.Builder builder) {

56

return builder.addContactPoint("127.0.0.1").build();

57

}

58

};

59

60

// Configure and add sink to DataStream

61

DataStream<Tuple3<String, Integer, String>> stream = // ... your data stream

62

63

CassandraSink.addSink(stream)

64

.setQuery("INSERT INTO example.words (word, count, description) VALUES (?, ?, ?);")

65

.setHost("127.0.0.1")

66

.build();

67

```

68

69

### POJO Sink with Mapper

70

71

```java

72

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

73

import com.datastax.driver.mapping.annotations.Table;

74

import com.datastax.driver.mapping.annotations.Column;

75

76

// Define a POJO with Cassandra annotations

77

@Table(keyspace = "example", name = "users")

78

public class User {

79

@Column(name = "id")

80

private String id;

81

82

@Column(name = "name")

83

private String name;

84

85

@Column(name = "age")

86

private Integer age;

87

88

// constructors, getters, setters...

89

}

90

91

// Use the POJO sink

92

DataStream<User> userStream = // ... your user stream

93

94

CassandraSink.addSink(userStream)

95

.setDefaultKeyspace("example")

96

.setHost("127.0.0.1")

97

.build();

98

```

99

100

### Exactly-Once Processing with Write-Ahead Log

101

102

```java

103

CassandraSink.addSink(stream)

104

.setQuery("INSERT INTO example.words (word, count) VALUES (?, ?);")

105

.setHost("127.0.0.1")

106

.enableWriteAheadLog() // Enable exactly-once processing

107

.build();

108

```

109

110

## Architecture

111

112

The Apache Flink Cassandra Connector is built around several key architectural components:

113

114

- **Sink Hierarchy**: Base `CassandraSinkBase` with type-specific implementations for different data formats

115

- **Builder Pattern**: Fluent `CassandraSinkBuilder` API for configuration with automatic type detection

116

- **DataStax Driver Integration**: Built on DataStax Java Driver 3.0.0 with shaded dependencies

117

- **Fault Tolerance**: Integration with Flink's checkpointing and write-ahead logging for exactly-once guarantees

118

- **Type System**: Support for Flink Tuples, Rows, POJOs, and Scala Products with automatic serialization

119

- **Configuration Management**: Centralized configuration through `CassandraSinkBaseConfig` and builder patterns

120

121

## Capabilities

122

123

### Streaming Data Sinks

124

125

Primary streaming sink functionality supporting multiple data types including Tuples, Rows, POJOs, and Scala Products. Provides builder-based configuration with automatic type detection and comprehensive failure handling.

126

127

```java { .api }

128

public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

129

130

public abstract static class CassandraSinkBuilder<IN> {

131

public CassandraSinkBuilder<IN> setQuery(String query);

132

public CassandraSinkBuilder<IN> setHost(String host);

133

public CassandraSinkBuilder<IN> setHost(String host, int port);

134

public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);

135

public CassandraSinkBuilder<IN> enableWriteAheadLog();

136

public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler);

137

public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests);

138

public CassandraSinkBuilder<IN> enableIgnoreNullFields();

139

public CassandraSink<IN> build();

140

}

141

```

142

143

[Streaming Sinks](./streaming-sinks.md)

144

145

### Batch Data Processing

146

147

Batch input and output formats for reading from and writing to Cassandra in batch processing jobs. Supports Tuples, Rows, and POJOs with configurable parallelism and connection management.

148

149

```java { .api }

150

public class CassandraInputFormat<OUT extends Tuple> extends CassandraInputFormatBase<OUT>;

151

public class CassandraPojoInputFormat<OUT> extends CassandraInputFormatBase<OUT>;

152

public class CassandraTupleOutputFormat<OUT extends Tuple> extends CassandraOutputFormatBase<OUT>;

153

public class CassandraPojoOutputFormat<OUT> extends RichOutputFormat<OUT>;

154

```

155

156

[Batch Connectors](./batch-connectors.md)

157

158

### Configuration and Connection Management

159

160

Connection builders, failure handlers, and configuration objects for customizing Cassandra connectivity, error handling, and performance tuning.

161

162

```java { .api }

163

public abstract class ClusterBuilder {

164

protected abstract Cluster buildCluster(Cluster.Builder builder);

165

}

166

167

public interface CassandraFailureHandler {

168

void onFailure(Throwable failure) throws IOException;

169

}

170

171

public interface MapperOptions {

172

Mapper.Option[] getMapperOptions();

173

}

174

```

175

176

[Configuration](./configuration.md)

177

178

### Write-Ahead Logging

179

180

Exactly-once processing guarantees through write-ahead logging with checkpoint integration. Stores records in Flink's state backend and commits them to Cassandra only on successful checkpoint completion.

181

182

```java { .api }

183

public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN>;

184

public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row>;

185

public class CassandraCommitter extends CheckpointCommitter;

186

```

187

188

[Write-Ahead Logging](./write-ahead-logging.md)

189

190

### Table API Integration

191

192

Integration with Flink's Table API for declarative stream processing. Provides append-only table sinks with schema inference and SQL compatibility.

193

194

```java { .api }

195

public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {

196

public CassandraAppendTableSink(ClusterBuilder builder, String cql);

197

public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes);

198

public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream);

199

}

200

```

201

202

[Table API](./table-api.md)

203

204

## Common Data Types

205

206

```java { .api }

207

// Connection configuration

208

public abstract class ClusterBuilder implements Serializable {

209

protected abstract Cluster buildCluster(Cluster.Builder builder);

210

}

211

212

// Failure handling

213

public interface CassandraFailureHandler extends Serializable {

214

void onFailure(Throwable failure) throws IOException;

215

}

216

217

// Configuration management

218

public final class CassandraSinkBaseConfig {

219

public int getMaxConcurrentRequests();

220

public Duration getMaxConcurrentRequestsTimeout();

221

public boolean getIgnoreNullFields();

222

223

public static Builder newBuilder();

224

}

225

226

// Mapper configuration for POJOs

227

public interface MapperOptions extends Serializable {

228

Mapper.Option[] getMapperOptions();

229

}

230

231

// Checkpoint management for exactly-once processing

232

public class CassandraCommitter extends CheckpointCommitter {

233

public CassandraCommitter(ClusterBuilder builder);

234

public void commitCheckpoint(int subtaskIdx, long checkpointId);

235

public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId);

236

}

237

```