or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdregistry.mdstorage-factory.mdstorage-implementation.mdupload-scheduling.mdwriters.md

registry.mddocs/

0

# Registry and Tracking

1

2

TaskManager-side registry for tracking changelog segments and managing their lifecycle. The registry coordinates reference counting, cleanup operations, and resource management for persisted changelog data.

3

4

## Capabilities

5

6

### TaskChangelogRegistry Interface

7

8

Core interface for tracking and managing changelog segments with reference counting and cleanup coordination.

9

10

```java { .api }

11

/**

12

* TaskManager-side registry for tracking changelog segments

13

*/

14

@Internal

15

public interface TaskChangelogRegistry {

16

17

/** No-operation registry implementation for testing or disabled scenarios */

18

TaskChangelogRegistry NO_OP = new TaskChangelogRegistry() { /* no-op implementation */ };

19

20

/**

21

* Starts tracking a changelog segment with initial reference count

22

* @param handle StreamStateHandle representing the changelog segment

23

* @param refCount Initial reference count for the segment

24

*/

25

void startTracking(StreamStateHandle handle, long refCount);

26

27

/**

28

* Stops tracking a changelog segment (decrements reference count)

29

* @param handle StreamStateHandle to stop tracking

30

*/

31

void stopTracking(StreamStateHandle handle);

32

33

/**

34

* Releases a changelog segment for cleanup when no longer needed

35

* @param handle StreamStateHandle to release

36

*/

37

void release(StreamStateHandle handle);

38

39

/**

40

* Creates default registry with specified number of async discard threads

41

* @param numAsyncDiscardThreads Number of threads for asynchronous cleanup

42

* @return Configured TaskChangelogRegistry instance

43

*/

44

static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);

45

46

/**

47

* Creates default registry with custom executor for testing

48

* @param executor Custom executor for discard operations

49

* @return TaskChangelogRegistry instance using the provided executor

50

*/

51

@VisibleForTesting

52

static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);

53

}

54

```

55

56

**Basic Registry Usage Example:**

57

58

```java

59

import org.apache.flink.changelog.fs.TaskChangelogRegistry;

60

import org.apache.flink.runtime.state.StreamStateHandle;

61

62

// Create registry with default settings

63

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

64

65

// Track a new changelog segment

66

StreamStateHandle handle = createChangelogHandle();

67

registry.startTracking(handle, 1); // Initial reference count of 1

68

69

// Multiple operators might reference the same segment

70

registry.startTracking(handle, 2); // Increment to 2 references

71

72

// When operators no longer need the segment

73

registry.stopTracking(handle); // Decrements to 1

74

registry.stopTracking(handle); // Decrements to 0

75

76

// Release when completely done

77

registry.release(handle); // Triggers cleanup

78

```

79

80

### Default Registry Creation

81

82

Factory methods for creating registry instances with different configuration options.

83

84

```java { .api }

85

/**

86

* Creates default registry with specified async discard threads

87

* @param numAsyncDiscardThreads Number of background threads for cleanup

88

* @return TaskChangelogRegistry with thread pool for async operations

89

*/

90

static TaskChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads);

91

92

/**

93

* Creates registry with custom executor (primarily for testing)

94

* @param executor Custom executor for discard operations

95

* @return TaskChangelogRegistry using provided executor

96

*/

97

@VisibleForTesting

98

static TaskChangelogRegistry defaultChangelogRegistry(Executor executor);

99

```

100

101

**Registry Creation Examples:**

102

103

```java

104

// Production usage: create with configurable thread count

105

int discardThreads = config.get(FsStateChangelogOptions.NUM_DISCARD_THREADS);

106

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(discardThreads);

107

108

// Testing usage: create with custom executor

109

Executor testExecutor = Executors.newSingleThreadExecutor();

110

TaskChangelogRegistry testRegistry = TaskChangelogRegistry.defaultChangelogRegistry(testExecutor);

111

112

// Disabled registry for scenarios where tracking is not needed

113

TaskChangelogRegistry disabledRegistry = TaskChangelogRegistry.NO_OP;

114

```

115

116

### Reference Counting and Lifecycle

117

118

The registry implements reference counting to ensure changelog segments are only cleaned up when no longer referenced.

119

120

```java { .api }

121

/**

122

* Starts tracking with initial reference count

123

* @param handle Changelog segment handle

124

* @param refCount Initial number of references

125

*/

126

void startTracking(StreamStateHandle handle, long refCount);

127

128

/**

129

* Decrements reference count by stopping tracking

130

* @param handle Changelog segment handle

131

*/

132

void stopTracking(StreamStateHandle handle);

133

134

/**

135

* Releases segment for cleanup when appropriate

136

* @param handle Changelog segment handle

137

*/

138

void release(StreamStateHandle handle);

139

```

140

141

**Reference Counting Examples:**

142

143

```java

144

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

145

StreamStateHandle handle = uploadResult.streamStateHandle;

146

147

// Scenario 1: Single operator using a segment

148

registry.startTracking(handle, 1);

149

// ... operator uses segment ...

150

registry.stopTracking(handle); // Ref count goes to 0

151

registry.release(handle); // Cleanup initiated

152

153

// Scenario 2: Multiple operators sharing a segment

154

registry.startTracking(handle, 3); // 3 operators will use this segment

155

156

// First operator finishes

157

registry.stopTracking(handle); // Ref count: 3 -> 2

158

159

// Second operator finishes

160

registry.stopTracking(handle); // Ref count: 2 -> 1

161

162

// Third operator finishes

163

registry.stopTracking(handle); // Ref count: 1 -> 0

164

165

// Now safe to release

166

registry.release(handle); // Cleanup initiated when ref count is 0

167

```

168

169

### TaskChangelogRegistryImpl Implementation

170

171

Internal implementation providing thread-safe reference counting and asynchronous cleanup.

172

173

```java { .api }

174

/**

175

* Default implementation of TaskChangelogRegistry with thread-safe operations

176

*/

177

@Internal

178

@ThreadSafe

179

class TaskChangelogRegistryImpl implements TaskChangelogRegistry {

180

181

/**

182

* Creates registry with executor for async discard operations

183

* @param discardExecutor Executor for running cleanup tasks

184

*/

185

public TaskChangelogRegistryImpl(Executor discardExecutor);

186

}

187

```

188

189

**Implementation Usage Example:**

190

191

```java

192

// Create with custom thread pool

193

ExecutorService discardExecutor = Executors.newFixedThreadPool(3);

194

TaskChangelogRegistry registry = new TaskChangelogRegistryImpl(discardExecutor);

195

196

// Registry will use the executor for async cleanup operations

197

StreamStateHandle handle = createHandle();

198

registry.startTracking(handle, 1);

199

registry.stopTracking(handle);

200

registry.release(handle); // Cleanup runs asynchronously on discardExecutor

201

202

// Clean up executor when done

203

discardExecutor.shutdown();

204

```

205

206

### Integration with Storage Components

207

208

The registry integrates with storage and upload components to coordinate segment lifecycle.

209

210

**Storage Integration Example:**

211

212

```java

213

// Create storage with registry

214

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

215

216

FsStateChangelogStorage storage = new FsStateChangelogStorage(

217

jobID,

218

config,

219

metricGroup,

220

registry, // Pass registry to storage

221

localRecoveryConfig

222

);

223

224

// Writer operations automatically coordinate with registry

225

FsStateChangelogWriter writer = storage.createWriter(operatorId, keyGroupRange, mailboxExecutor);

226

227

// When persist completes, registry tracks the result

228

CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =

229

writer.persist(sequenceNumber, checkpointId);

230

231

future.thenAccept(result -> {

232

StreamStateHandle handle = result.getJobManagerOwnedSnapshot().getStreamStateHandle();

233

// Registry automatically starts tracking this handle

234

System.out.println("Registry now tracking: " + handle);

235

});

236

```

237

238

### Upload Coordination

239

240

The registry coordinates with upload schedulers to manage segment references during upload operations.

241

242

**Upload Coordination Example:**

243

244

```java

245

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

246

247

StateChangeUploadScheduler scheduler = StateChangeUploadScheduler.fromConfig(

248

jobID, config, metricGroup, registry, localRecoveryConfig

249

);

250

251

// Upload task completion automatically updates registry

252

UploadTask task = new UploadTask();

253

task.changeset = changeSet;

254

255

task.onCompleted = uploadResult -> {

256

// Upload scheduler coordinates with registry

257

StreamStateHandle handle = uploadResult.streamStateHandle;

258

259

// Registry tracks the uploaded segment

260

registry.startTracking(handle, 1);

261

262

System.out.println("Upload completed and registered: " + handle);

263

};

264

265

scheduler.upload(task);

266

```

267

268

### No-Op Registry

269

270

Special implementation that performs no tracking, useful for testing or disabled scenarios.

271

272

```java { .api }

273

/**

274

* No-operation registry that performs no tracking

275

*/

276

TaskChangelogRegistry NO_OP = new NoOpTaskChangelogRegistry();

277

```

278

279

**No-Op Usage Example:**

280

281

```java

282

// Use NO_OP registry when tracking is not needed

283

TaskChangelogRegistry noOpRegistry = TaskChangelogRegistry.NO_OP;

284

285

// All operations are no-ops (safe but perform no actual tracking)

286

noOpRegistry.startTracking(handle, 1); // Does nothing

287

noOpRegistry.stopTracking(handle); // Does nothing

288

noOpRegistry.release(handle); // Does nothing

289

290

// Useful for testing or minimal configurations

291

FsStateChangelogStorage storage = new FsStateChangelogStorage(

292

jobID, config, metricGroup,

293

TaskChangelogRegistry.NO_OP, // Disable tracking

294

localRecoveryConfig

295

);

296

```

297

298

### Thread Safety and Concurrency

299

300

The registry implementations are thread-safe and coordinate properly with concurrent operations.

301

302

**Concurrent Usage Example:**

303

304

```java

305

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(4);

306

StreamStateHandle sharedHandle = createSharedHandle();

307

308

// Multiple threads can safely interact with the registry

309

ExecutorService executor = Executors.newFixedThreadPool(10);

310

311

// Thread 1: Start tracking

312

executor.submit(() -> {

313

registry.startTracking(sharedHandle, 5);

314

System.out.println("Started tracking with 5 references");

315

});

316

317

// Threads 2-6: Stop tracking (decrementing references)

318

for (int i = 0; i < 5; i++) {

319

executor.submit(() -> {

320

registry.stopTracking(sharedHandle);

321

System.out.println("Stopped tracking (decremented reference)");

322

});

323

}

324

325

// Thread 7: Release when ready

326

executor.submit(() -> {

327

// Wait a bit to ensure all stop tracking calls complete

328

try { Thread.sleep(100); } catch (InterruptedException e) {}

329

registry.release(sharedHandle);

330

System.out.println("Released handle for cleanup");

331

});

332

333

executor.shutdown();

334

```

335

336

### Error Handling and Resilience

337

338

The registry provides appropriate error handling for various failure scenarios.

339

340

**Error Handling Examples:**

341

342

```java

343

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(2);

344

345

try {

346

// Normal operation

347

registry.startTracking(handle, 1);

348

registry.stopTracking(handle);

349

registry.release(handle);

350

} catch (Exception e) {

351

System.err.println("Registry operation failed: " + e.getMessage());

352

// Registry errors are typically non-fatal but should be logged

353

}

354

355

// Handle invalid operations gracefully

356

registry.stopTracking(nonExistentHandle); // Safe no-op in most implementations

357

registry.release(alreadyReleasedHandle); // Safe no-op in most implementations

358

359

// Defensive programming

360

if (handle != null) {

361

registry.startTracking(handle, 1);

362

}

363

```

364

365

### Cleanup and Shutdown

366

367

Proper cleanup of registry resources when shutting down.

368

369

**Cleanup Example:**

370

371

```java

372

TaskChangelogRegistry registry = TaskChangelogRegistry.defaultChangelogRegistry(3);

373

374

// Use registry during application lifecycle

375

// ...

376

377

// During shutdown, ensure proper cleanup

378

if (registry instanceof TaskChangelogRegistryImpl) {

379

TaskChangelogRegistryImpl impl = (TaskChangelogRegistryImpl) registry;

380

// Implementation handles cleanup of internal executor and pending operations

381

impl.close(); // If close method is available

382

}

383

384

// Or let it clean up naturally during application shutdown

385

// The internal executor will be shutdown when the JVM exits

386

```