or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdkafka-consumer.mdkafka-producer.mdoffset-management.mdtable-api.md

offset-management.mddocs/

0

# Offset Management

1

2

The Flink Kafka 0.8 connector provides comprehensive offset management through ZooKeeper integration and internal utilities for reliable message processing.

3

4

## Capabilities

5

6

### ZookeeperOffsetHandler

7

8

Utility class for manual offset management through ZooKeeper.

9

10

```java { .api }

11

/**

12

* Handler for managing Kafka consumer offsets in ZooKeeper

13

*/

14

public class ZookeeperOffsetHandler {

15

/**

16

* Sets the consumer offset for a specific topic partition in ZooKeeper

17

* @param curatorClient ZooKeeper client instance

18

* @param groupId Consumer group identifier

19

* @param topic Kafka topic name

20

* @param partition Partition number

21

* @param offset Offset value to set

22

* @throws Exception if ZooKeeper operation fails

23

*/

24

public static void setOffsetInZooKeeper(

25

CuratorFramework curatorClient,

26

String groupId,

27

String topic,

28

int partition,

29

long offset

30

) throws Exception;

31

32

/**

33

* Retrieves the consumer offset for a specific topic partition from ZooKeeper

34

* @param curatorClient ZooKeeper client instance

35

* @param groupId Consumer group identifier

36

* @param topic Kafka topic name

37

* @param partition Partition number

38

* @return Current offset value, or null if not found

39

* @throws Exception if ZooKeeper operation fails

40

*/

41

public static Long getOffsetFromZooKeeper(

42

CuratorFramework curatorClient,

43

String groupId,

44

String topic,

45

int partition

46

) throws Exception;

47

}

48

```

49

50

**Usage Example:**

51

52

```java

53

import org.apache.curator.framework.CuratorFramework;

54

import org.apache.curator.framework.CuratorFrameworkFactory;

55

import org.apache.curator.retry.ExponentialBackoffRetry;

56

import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;

57

58

// Create ZooKeeper client

59

CuratorFramework zkClient = CuratorFrameworkFactory.newClient(

60

"localhost:2181",

61

new ExponentialBackoffRetry(1000, 3)

62

);

63

zkClient.start();

64

65

try {

66

// Set offset for a specific partition

67

ZookeeperOffsetHandler.setOffsetInZooKeeper(

68

zkClient,

69

"my-consumer-group",

70

"my-topic",

71

0, // partition 0

72

12345L // offset value

73

);

74

75

// Retrieve offset for a partition

76

Long currentOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(

77

zkClient,

78

"my-consumer-group",

79

"my-topic",

80

0 // partition 0

81

);

82

83

if (currentOffset != null) {

84

System.out.println("Current offset: " + currentOffset);

85

} else {

86

System.out.println("No offset found");

87

}

88

89

} finally {

90

zkClient.close();

91

}

92

```

93

94

### PeriodicOffsetCommitter

95

96

Internal thread for periodic offset commits to ZooKeeper.

97

98

```java { .api }

99

/**

100

* Background thread that periodically commits consumer offsets to ZooKeeper

101

*/

102

public class PeriodicOffsetCommitter extends Thread {

103

// Implementation details are internal

104

// Used automatically by FlinkKafkaConsumer08

105

}

106

```

107

108

### ClosableBlockingQueue

109

110

Thread-safe queue with close capability used internally for offset management.

111

112

```java { .api }

113

/**

114

* Thread-safe blocking queue that can be atomically closed

115

*/

116

public class ClosableBlockingQueue<E> {

117

/**

118

* Creates empty closable queue

119

*/

120

public ClosableBlockingQueue();

121

122

/**

123

* Creates closable queue with initial capacity

124

* @param initialSize Initial capacity hint

125

*/

126

public ClosableBlockingQueue(int initialSize);

127

128

/**

129

* Creates closable queue with initial elements

130

* @param initialElements Initial elements to add

131

*/

132

public ClosableBlockingQueue(Collection<? extends E> initialElements);

133

134

/**

135

* Returns current queue size

136

* @return Number of elements in queue

137

*/

138

public int size();

139

140

/**

141

* Checks if queue is empty

142

* @return true if queue contains no elements

143

*/

144

public boolean isEmpty();

145

146

/**

147

* Checks if queue is still open for operations

148

* @return true if queue accepts new elements

149

*/

150

public boolean isOpen();

151

152

/**

153

* Atomically closes the queue, preventing new additions

154

* @return true if queue was successfully closed

155

*/

156

public boolean close();

157

158

/**

159

* Adds element to queue only if queue is open

160

* @param element Element to add

161

* @return true if element was added successfully

162

*/

163

public boolean addIfOpen(E element);

164

165

/**

166

* Adds element to queue (blocks if closed)

167

* @param element Element to add

168

* @throws IllegalStateException if queue is closed

169

*/

170

public void add(E element);

171

172

/**

173

* Retrieves but does not remove head element

174

* @return Head element or null if empty

175

*/

176

public E peek();

177

178

/**

179

* Retrieves and removes head element

180

* @return Head element or null if empty

181

*/

182

public E poll();

183

184

/**

185

* Retrieves and removes all available elements

186

* @return List of all elements removed from queue

187

*/

188

public List<E> pollBatch();

189

190

/**

191

* Blocks until element is available, then returns it

192

* @return Next available element

193

* @throws InterruptedException if interrupted while waiting

194

*/

195

public E getElementBlocking() throws InterruptedException;

196

197

/**

198

* Blocks until element is available or timeout expires

199

* @param timeoutMillis Maximum wait time in milliseconds

200

* @return Next available element or null if timeout

201

* @throws InterruptedException if interrupted while waiting

202

*/

203

public E getElementBlocking(long timeoutMillis) throws InterruptedException;

204

205

/**

206

* Blocks until batch of elements is available

207

* @return List of available elements

208

* @throws InterruptedException if interrupted while waiting

209

*/

210

public List<E> getBatchBlocking() throws InterruptedException;

211

212

/**

213

* Blocks until batch is available or timeout expires

214

* @param timeoutMillis Maximum wait time in milliseconds

215

* @return List of available elements or empty list if timeout

216

* @throws InterruptedException if interrupted while waiting

217

*/

218

public List<E> getBatchBlocking(long timeoutMillis) throws InterruptedException;

219

}

220

```

221

222

## Offset Management Strategies

223

224

### Automatic Offset Management

225

226

The consumer automatically manages offsets through Flink's checkpointing:

227

228

```java

229

// Enable checkpointing for automatic offset management

230

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

231

env.enableCheckpointing(5000); // checkpoint every 5 seconds

232

233

Properties props = new Properties();

234

props.setProperty("bootstrap.servers", "localhost:9092");

235

props.setProperty("zookeeper.connect", "localhost:2181");

236

props.setProperty("group.id", "auto-offset-group");

237

props.setProperty("enable.auto.commit", "false"); // Let Flink manage offsets

238

239

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

240

"my-topic",

241

new SimpleStringSchema(),

242

props

243

);

244

245

// Configure starting position

246

consumer.setStartFromGroupOffsets(); // Default: use committed offsets

247

// OR

248

consumer.setStartFromEarliest(); // Start from beginning

249

// OR

250

consumer.setStartFromLatest(); // Start from end

251

// OR

252

consumer.setStartFromTimestamp(timestamp); // Start from specific time

253

254

env.addSource(consumer);

255

```

256

257

### Manual Offset Management

258

259

For advanced use cases, manually manage offsets:

260

261

```java

262

import org.apache.curator.framework.CuratorFramework;

263

import org.apache.curator.framework.CuratorFrameworkFactory;

264

import org.apache.curator.retry.ExponentialBackoffRetry;

265

266

public class ManualOffsetManager {

267

private final CuratorFramework zkClient;

268

private final String groupId;

269

270

public ManualOffsetManager(String zkConnect, String groupId) {

271

this.groupId = groupId;

272

this.zkClient = CuratorFrameworkFactory.newClient(

273

zkConnect,

274

new ExponentialBackoffRetry(1000, 3)

275

);

276

zkClient.start();

277

}

278

279

public void commitOffset(String topic, int partition, long offset) {

280

try {

281

ZookeeperOffsetHandler.setOffsetInZooKeeper(

282

zkClient, groupId, topic, partition, offset

283

);

284

} catch (Exception e) {

285

throw new RuntimeException("Failed to commit offset", e);

286

}

287

}

288

289

public Long getCurrentOffset(String topic, int partition) {

290

try {

291

return ZookeeperOffsetHandler.getOffsetFromZooKeeper(

292

zkClient, groupId, topic, partition

293

);

294

} catch (Exception e) {

295

throw new RuntimeException("Failed to get offset", e);

296

}

297

}

298

299

public void close() {

300

zkClient.close();

301

}

302

}

303

304

// Usage

305

ManualOffsetManager offsetManager = new ManualOffsetManager(

306

"localhost:2181",

307

"manual-offset-group"

308

);

309

310

// Check current offset before processing

311

Long currentOffset = offsetManager.getCurrentOffset("my-topic", 0);

312

if (currentOffset == null) {

313

// No previous offset, start from beginning

314

currentOffset = 0L;

315

}

316

317

// Process messages and commit offset periodically

318

offsetManager.commitOffset("my-topic", 0, processedOffset);

319

```

320

321

### Offset Reset Strategies

322

323

Configure behavior when no committed offset exists:

324

325

```java

326

Properties props = new Properties();

327

props.setProperty("bootstrap.servers", "localhost:9092");

328

props.setProperty("zookeeper.connect", "localhost:2181");

329

props.setProperty("group.id", "reset-strategy-group");

330

331

// Configure reset strategy

332

props.setProperty("auto.offset.reset", "earliest"); // Start from beginning

333

// OR

334

props.setProperty("auto.offset.reset", "latest"); // Start from end

335

336

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

337

"my-topic",

338

new SimpleStringSchema(),

339

props

340

);

341

342

// Override reset strategy programmatically

343

consumer.setStartFromEarliest(); // Always start from beginning

344

consumer.setStartFromLatest(); // Always start from end

345

consumer.setStartFromGroupOffsets(); // Use committed offsets or auto.offset.reset

346

```

347

348

## Fault Tolerance

349

350

### Checkpointing Integration

351

352

Offsets are automatically included in Flink checkpoints:

353

354

```java

355

// Configure checkpointing for fault tolerance

356

env.enableCheckpointing(5000); // Checkpoint interval

357

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // Min pause

358

env.getCheckpointConfig().setCheckpointTimeout(60000); // Timeout

359

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Max concurrent

360

361

// Consumer participates automatically in checkpointing

362

FlinkKafkaConsumer08<String> consumer = new FlinkKafkaConsumer08<>(

363

"fault-tolerant-topic",

364

new SimpleStringSchema(),

365

props

366

);

367

368

env.addSource(consumer);

369

```

370

371

### Recovery Behavior

372

373

On recovery from failure:

374

375

1. **Checkpoint Recovery**: Offsets restored from last successful checkpoint

376

2. **ZooKeeper Fallback**: If no checkpoint, use committed offsets in ZooKeeper

377

3. **Reset Strategy**: If no committed offsets, use `auto.offset.reset` setting

378

379

### Error Handling

380

381

Handle offset-related errors:

382

383

```java

384

// Monitor offset commit failures

385

props.setProperty("consumer.offset.commit.timeout.ms", "5000");

386

387

try {

388

// Offset operations may throw exceptions

389

Long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(

390

zkClient, groupId, topic, partition

391

);

392

} catch (Exception e) {

393

logger.warn("Failed to retrieve offset from ZooKeeper", e);

394

// Fall back to default behavior

395

offset = null;

396

}

397

```

398

399

## Performance Considerations

400

401

- **Commit Frequency**: Balance between performance and durability

402

- **ZooKeeper Load**: Minimize ZooKeeper operations for high-throughput scenarios

403

- **Batch Processing**: Use batch operations when possible

404

- **Connection Pooling**: Reuse ZooKeeper connections across operations

405

- **Monitoring**: Track offset lag and commit latency