or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-base.mdindex.mdpartitioners.mdproducer-base.mdserialization.mdtable-api.md

producer-base.mddocs/

0

# Producer Base Classes

1

2

Abstract base implementations for Kafka producers that provide exactly-once delivery semantics, serialization handling, and partitioning logic. These classes handle the complexities of reliable message production while delegating version-specific operations to concrete implementations.

3

4

## Capabilities

5

6

### FlinkKafkaProducerBase

7

8

The core abstract base class that all Flink Kafka producers extend. Provides comprehensive functionality for producing to Kafka topics with exactly-once processing guarantees and transaction support.

9

10

```java { .api }

11

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>

12

implements CheckpointedFunction {

13

14

public FlinkKafkaProducerBase(

15

String defaultTopicId,

16

KeyedSerializationSchema<IN> serializationSchema,

17

Properties producerConfig,

18

FlinkKafkaPartitioner<IN> customPartitioner

19

);

20

}

21

```

22

23

**Parameters:**

24

- `defaultTopicId` - Default target topic for messages (can be overridden by serialization schema)

25

- `serializationSchema` - Schema for serializing elements to Kafka key-value messages

26

- `producerConfig` - Kafka producer configuration properties

27

- `customPartitioner` - Custom partitioner for determining target partitions (optional, can be null)

28

29

**Usage Example:**

30

31

```java

32

Properties props = new Properties();

33

props.setProperty("bootstrap.servers", "localhost:9092");

34

props.setProperty("transaction.timeout.ms", "900000");

35

36

FlinkKafkaProducerBase<MyEvent> producer = new MyKafkaProducer(

37

"events-topic",

38

new MyEventSerializationSchema(),

39

props,

40

new FlinkFixedPartitioner<>()

41

);

42

```

43

44

### Error Handling Configuration

45

46

Configure how the producer handles failures during message production.

47

48

```java { .api }

49

public void setLogFailuresOnly(boolean logFailuresOnly);

50

```

51

52

**Parameters:**

53

- `logFailuresOnly` - If true, failures are only logged (not thrown). If false, failures cause job failure.

54

55

**Usage Example:**

56

57

```java

58

// Log failures but continue processing (not recommended for production)

59

producer.setLogFailuresOnly(true);

60

61

// Fail job on any production failure (recommended for exactly-once)

62

producer.setLogFailuresOnly(false);

63

```

64

65

### Checkpoint Flush Behavior

66

67

Control whether messages are flushed synchronously during checkpoints for exactly-once guarantees.

68

69

```java { .api }

70

public void setFlushOnCheckpoint(boolean flush);

71

```

72

73

**Parameters:**

74

- `flush` - If true, all pending messages are flushed during checkpoints (required for exactly-once)

75

76

**Usage Example:**

77

78

```java

79

// Enable checkpoint flushing for exactly-once guarantees

80

producer.setFlushOnCheckpoint(true);

81

```

82

83

### Utility Methods

84

85

Static utility methods for common configuration tasks.

86

87

```java { .api }

88

public static Properties getPropertiesFromBrokerList(String brokerList);

89

```

90

91

**Parameters:**

92

- `brokerList` - Comma-separated list of Kafka brokers (host:port format)

93

94

**Returns:** Properties object with bootstrap.servers configured

95

96

**Usage Example:**

97

98

```java

99

Properties props = FlinkKafkaProducerBase.getPropertiesFromBrokerList("broker1:9092,broker2:9092");

100

// Additional properties can be set on the returned Properties object

101

props.setProperty("transaction.timeout.ms", "900000");

102

```

103

104

### Message Production

105

106

Core method for sending messages to Kafka (called by Flink runtime).

107

108

```java { .api }

109

public void invoke(IN next, Context context) throws Exception;

110

```

111

112

**Parameters:**

113

- `next` - The record to be sent to Kafka

114

- `context` - Sink context providing additional information

115

116

This method is called by the Flink runtime for each record and should not be called directly by user code.

117

118

### State Management

119

120

Handle checkpointing and transaction coordination (implemented by the framework).

121

122

```java { .api }

123

public void initializeState(FunctionInitializationContext context) throws Exception;

124

public void snapshotState(FunctionSnapshotContext context) throws Exception;

125

```

126

127

These methods handle the exactly-once semantics by coordinating with Kafka transactions and Flink checkpoints.

128

129

### Resource Management

130

131

```java { .api }

132

public void open(Configuration configuration);

133

public void close() throws Exception;

134

```

135

136

These methods handle producer lifecycle management including Kafka client initialization and cleanup.

137

138

### Abstract Methods

139

140

Concrete implementations must implement this version-specific method:

141

142

```java { .api }

143

protected abstract void flush();

144

```

145

146

This method must flush all pending records to ensure they are sent to Kafka. Called during checkpoints to guarantee exactly-once processing when `setFlushOnCheckpoint(true)` is configured.

147

148

### Protected API for Subclasses

149

150

Methods and fields available to concrete implementations:

151

152

```java { .api }

153

protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props);

154

protected void checkErroneous() throws Exception;

155

protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer);

156

protected long numPendingRecords();

157

```

158

159

**Protected Fields:**

160

```java { .api }

161

protected final Properties producerConfig;

162

protected final String defaultTopicId;

163

protected final KeyedSerializationSchema<IN> schema;

164

protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;

165

protected boolean logFailuresOnly;

166

protected boolean flushOnCheckpoint;

167

```

168

169

**Methods:**

170

- `getKafkaProducer()` - Factory method for creating Kafka producer instances

171

- `checkErroneous()` - Check for and throw any pending async exceptions

172

- `getPartitionsByTopic()` - Utility to discover partitions for a topic

173

- `numPendingRecords()` - Get count of unacknowledged records (useful for monitoring)

174

175

**Constants:**

176

```java { .api }

177

public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

178

```

179

180

## Configuration Best Practices

181

182

### Exactly-Once Configuration

183

184

For exactly-once processing guarantees, configure the producer as follows:

185

186

```java

187

Properties props = new Properties();

188

props.setProperty("bootstrap.servers", "localhost:9092");

189

props.setProperty("transaction.timeout.ms", "900000");

190

props.setProperty("max.in.flight.requests.per.connection", "1");

191

props.setProperty("retries", "2147483647");

192

props.setProperty("enable.idempotence", "true");

193

194

producer.setFlushOnCheckpoint(true);

195

producer.setLogFailuresOnly(false);

196

```

197

198

### At-Least-Once Configuration

199

200

For at-least-once processing with higher throughput:

201

202

```java

203

Properties props = new Properties();

204

props.setProperty("bootstrap.servers", "localhost:9092");

205

props.setProperty("acks", "all");

206

props.setProperty("retries", "2147483647");

207

208

producer.setFlushOnCheckpoint(false);

209

producer.setLogFailuresOnly(false);

210

```

211

212

### High-Throughput Configuration

213

214

For maximum throughput with at-least-once guarantees:

215

216

```java

217

Properties props = new Properties();

218

props.setProperty("bootstrap.servers", "localhost:9092");

219

props.setProperty("acks", "1");

220

props.setProperty("batch.size", "16384");

221

props.setProperty("linger.ms", "5");

222

props.setProperty("compression.type", "lz4");

223

224

producer.setFlushOnCheckpoint(false);

225

producer.setLogFailuresOnly(true);

226

```

227

228

## Error Handling

229

230

The producer handles various types of failures:

231

232

- **Retriable Errors**: Automatically retried based on `retries` configuration

233

- **Non-Retriable Errors**: Cause immediate failure or logging based on `logFailuresOnly` setting

234

- **Transaction Errors**: Handled through checkpoint coordination and transaction abort/retry

235

- **Network Errors**: Handled through connection retry and failover mechanisms

236

237

When `logFailuresOnly` is false (recommended), any production failure will cause the Flink job to fail and restart, ensuring no data loss with proper checkpoint configuration.