or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconnection-configuration.mdfault-tolerance.mdindex.mdstreaming-sinks.md

index.mddocs/

0

# Apache Flink Cassandra Connector

1

2

Apache Flink Cassandra Connector provides comprehensive integration between Apache Flink and Apache Cassandra databases. It supports both streaming (DataStream API) and batch (DataSet API) processing with multiple sink implementations for different data types and processing guarantees, including exactly-once semantics through write-ahead logging.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-cassandra_2.10

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add dependency to your Maven project:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-connector-cassandra_2.10</artifactId>

15

<version>1.3.3</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

23

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

24

import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase;

25

import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;

26

import org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink;

27

import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;

28

import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;

29

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;

30

import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;

31

```

32

33

## Basic Usage

34

35

### Streaming Sink Example

36

37

```java

38

import org.apache.flink.streaming.api.datastream.DataStream;

39

import org.apache.flink.streaming.connectors.cassandra.CassandraSink;

40

import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

41

import org.apache.flink.api.java.tuple.Tuple3;

42

import com.datastax.driver.core.Cluster;

43

44

// Define a ClusterBuilder for connection configuration

45

ClusterBuilder builder = new ClusterBuilder() {

46

@Override

47

protected Cluster buildCluster(Cluster.Builder builder) {

48

return builder.addContactPoint("127.0.0.1").withPort(9042).build();

49

}

50

};

51

52

// Create a tuple-based sink

53

DataStream<Tuple3<String, Integer, String>> stream = // your data stream

54

CassandraSink<Tuple3<String, Integer, String>> sink = CassandraSink

55

.addSink(stream)

56

.setQuery("INSERT INTO example.users (name, age, email) VALUES (?, ?, ?);")

57

.setHost("localhost", 9042)

58

.build();

59

60

sink.name("Cassandra Sink");

61

```

62

63

### Batch Processing Example

64

65

```java

66

import org.apache.flink.api.java.DataSet;

67

import org.apache.flink.api.java.ExecutionEnvironment;

68

import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat;

69

import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat;

70

import org.apache.flink.api.java.tuple.Tuple2;

71

72

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

73

74

// Read from Cassandra

75

CassandraInputFormat<Tuple2<String, Integer>> inputFormat =

76

new CassandraInputFormat<>("SELECT name, age FROM example.users", builder);

77

DataSet<Tuple2<String, Integer>> input = env.createInput(inputFormat);

78

79

// Write to Cassandra

80

CassandraOutputFormat<Tuple2<String, String>> outputFormat =

81

new CassandraOutputFormat<>("INSERT INTO example.processed (name, status) VALUES (?, ?)", builder);

82

result.output(outputFormat);

83

```

84

85

## Architecture

86

87

The Flink Cassandra Connector is organized into two main packages with distinct responsibilities:

88

89

- **Streaming Connectors** (`org.apache.flink.streaming.connectors.cassandra`): DataStream API integration with multiple sink implementations

90

- **Batch Connectors** (`org.apache.flink.batch.connectors.cassandra`): DataSet API integration for batch processing

91

- **Configuration Management**: `ClusterBuilder` abstract class for Cassandra cluster connection setup

92

- **Fault Tolerance**: Write-ahead logging support for exactly-once processing guarantees

93

94

## Capabilities

95

96

### Streaming Data Sinks

97

98

Comprehensive sink implementations for streaming data integration with different data types and processing guarantees.

99

100

```java { .api }

101

public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input);

102

103

public abstract static class CassandraSinkBuilder<IN> {

104

public CassandraSinkBuilder<IN> setQuery(String query);

105

public CassandraSinkBuilder<IN> setHost(String host);

106

public CassandraSinkBuilder<IN> setHost(String host, int port);

107

public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder);

108

public CassandraSinkBuilder<IN> enableWriteAheadLog();

109

public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer);

110

public abstract CassandraSink<IN> build();

111

}

112

```

113

114

[Streaming Sinks](./streaming-sinks.md)

115

116

### Batch Data Processing

117

118

Input and output formats for batch processing jobs using the DataSet API.

119

120

```java { .api }

121

public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> {

122

public CassandraInputFormat(String query, ClusterBuilder builder);

123

}

124

125

public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {

126

public CassandraOutputFormat(String insertQuery, ClusterBuilder builder);

127

}

128

```

129

130

[Batch Processing](./batch-processing.md)

131

132

### Connection Configuration

133

134

Abstract configuration system for customizing Cassandra cluster connections with support for authentication, SSL, and advanced connection parameters.

135

136

```java { .api }

137

public abstract class ClusterBuilder implements Serializable {

138

public Cluster getCluster();

139

protected abstract Cluster buildCluster(Cluster.Builder builder);

140

}

141

```

142

143

[Connection Configuration](./connection-configuration.md)

144

145

### Fault Tolerance & Write-Ahead Logging

146

147

Exactly-once processing guarantees through checkpoint coordination and write-ahead logging for streaming applications.

148

149

```java { .api }

150

public class CassandraCommitter extends CheckpointCommitter {

151

public CassandraCommitter(ClusterBuilder builder);

152

public CassandraCommitter(ClusterBuilder builder, String keySpace);

153

}

154

```

155

156

[Fault Tolerance](./fault-tolerance.md)

157

158

## Types

159

160

### Core Types

161

162

```java { .api }

163

// Main sink wrapper class

164

public class CassandraSink<IN> {

165

// Sink configuration and lifecycle methods

166

public CassandraSink<IN> name(String name);

167

public CassandraSink<IN> uid(String uid);

168

public CassandraSink<IN> setUidHash(String uidHash);

169

public CassandraSink<IN> setParallelism(int parallelism);

170

public CassandraSink<IN> disableChaining();

171

public CassandraSink<IN> slotSharingGroup(String slotSharingGroup);

172

}

173

174

// Tuple-specific sink builder

175

public static class CassandraTupleSinkBuilder<IN extends Tuple> extends CassandraSinkBuilder<IN> {

176

// Specialized for tuple-based data with CQL queries

177

}

178

179

// POJO-specific sink builder

180

public static class CassandraPojoSinkBuilder<IN> extends CassandraSinkBuilder<IN> {

181

// Specialized for POJO-based data with DataStax mapping annotations

182

}

183

```