or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

caching.mdindex.mdleader-election.mdlocking.mdshared-values.md

shared-values.mddocs/

0

# Shared Values

1

2

Shared value capabilities for coordinating counters and arbitrary data across multiple processes. Provides thread-safe shared state management with versioning and change notifications.

3

4

## Capabilities

5

6

### SharedCount

7

8

Thread-safe shared integer counter that multiple processes can read and modify atomically.

9

10

```java { .api }

11

/**

12

* Manages a shared integer that can be safely updated by multiple processes

13

*/

14

public class SharedCount implements SharedCountReader, Closeable {

15

/**

16

* Create a new SharedCount

17

* @param client the curator client

18

* @param path the path to use for the shared count

19

* @param seedValue initial value for the count

20

*/

21

public SharedCount(CuratorFramework client, String path, int seedValue);

22

23

/**

24

* Start the shared count management

25

* @throws Exception if startup fails

26

*/

27

public void start() throws Exception;

28

29

/**

30

* Close the shared count

31

*/

32

@Override

33

public void close() throws IOException;

34

35

/**

36

* Get the current count value

37

* @return current count value

38

*/

39

@Override

40

public int getCount();

41

42

/**

43

* Get the current version of the count

44

* @return version number for optimistic locking

45

*/

46

@Override

47

public VersionedValue<Integer> getVersionedValue();

48

49

/**

50

* Add a listener for count changes

51

* @param listener the listener to add

52

*/

53

@Override

54

public void addListener(SharedCountListener listener);

55

56

/**

57

* Add listener with specific executor

58

* @param listener the listener to add

59

* @param executor executor for listener callbacks

60

*/

61

@Override

62

public void addListener(SharedCountListener listener, Executor executor);

63

64

/**

65

* Remove a listener

66

* @param listener the listener to remove

67

*/

68

@Override

69

public void removeListener(SharedCountListener listener);

70

71

/**

72

* Set the count to a new value

73

* @param newCount the new count value

74

* @return true if the set succeeded

75

* @throws Exception if operation fails

76

*/

77

public boolean setCount(int newCount) throws Exception;

78

79

/**

80

* Try to set the count with version checking

81

* @param newCount the new count value

82

* @param expectedVersion expected current version

83

* @return true if the set succeeded

84

* @throws Exception if operation fails

85

*/

86

public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;

87

}

88

```

89

90

**Usage Example:**

91

92

```java

93

import org.apache.curator.framework.CuratorFramework;

94

import org.apache.curator.framework.recipes.shared.SharedCount;

95

import org.apache.curator.framework.recipes.shared.SharedCountListener;

96

import org.apache.curator.framework.recipes.shared.VersionedValue;

97

import org.apache.curator.framework.state.ConnectionState;

98

99

CuratorFramework client = // ... initialize client

100

SharedCount sharedCounter = new SharedCount(client, "/app/counters/global", 0);

101

102

// Add listener for count changes

103

sharedCounter.addListener(new SharedCountListener() {

104

@Override

105

public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {

106

System.out.println("Counter changed to: " + newCount);

107

}

108

109

@Override

110

public void stateChanged(CuratorFramework client, ConnectionState newState) {

111

System.out.println("Connection state changed: " + newState);

112

}

113

});

114

115

try {

116

sharedCounter.start();

117

118

// Read current value

119

int currentValue = sharedCounter.getCount();

120

System.out.println("Current counter value: " + currentValue);

121

122

// Increment counter atomically

123

VersionedValue<Integer> versionedValue = sharedCounter.getVersionedValue();

124

boolean success = sharedCounter.trySetCount(versionedValue, versionedValue.getValue() + 1);

125

126

if (success) {

127

System.out.println("Successfully incremented counter");

128

} else {

129

System.out.println("Counter was modified by another process, retry needed");

130

}

131

132

// Force set to specific value

133

sharedCounter.setCount(100);

134

135

} finally {

136

sharedCounter.close();

137

}

138

```

139

140

### SharedValue

141

142

Thread-safe shared arbitrary value that multiple processes can read and modify atomically.

143

144

```java { .api }

145

/**

146

* Manages a shared arbitrary value that can be safely updated by multiple processes

147

*/

148

public class SharedValue implements SharedValueReader, Closeable {

149

/**

150

* Create a new SharedValue

151

* @param client the curator client

152

* @param path the path to use for the shared value

153

* @param seedValue initial value (byte array)

154

*/

155

public SharedValue(CuratorFramework client, String path, byte[] seedValue);

156

157

/**

158

* Start the shared value management

159

* @throws Exception if startup fails

160

*/

161

public void start() throws Exception;

162

163

/**

164

* Close the shared value

165

*/

166

@Override

167

public void close() throws IOException;

168

169

/**

170

* Get the current value

171

* @return current value as byte array

172

*/

173

@Override

174

public byte[] getValue();

175

176

/**

177

* Get the current versioned value

178

* @return versioned value for optimistic locking

179

*/

180

@Override

181

public VersionedValue<byte[]> getVersionedValue();

182

183

/**

184

* Add a listener for value changes

185

* @param listener the listener to add

186

*/

187

@Override

188

public void addListener(SharedValueListener listener);

189

190

/**

191

* Add listener with specific executor

192

* @param listener the listener to add

193

* @param executor executor for listener callbacks

194

*/

195

@Override

196

public void addListener(SharedValueListener listener, Executor executor);

197

198

/**

199

* Remove a listener

200

* @param listener the listener to remove

201

*/

202

@Override

203

public void removeListener(SharedValueListener listener);

204

205

/**

206

* Set the value

207

* @param newValue the new value

208

* @return true if the set succeeded

209

* @throws Exception if operation fails

210

*/

211

public boolean setValue(byte[] newValue) throws Exception;

212

213

/**

214

* Try to set the value with version checking

215

* @param previous the previous versioned value

216

* @param newValue the new value

217

* @return true if the set succeeded

218

* @throws Exception if operation fails

219

*/

220

public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception;

221

}

222

```

223

224

**Usage Example:**

225

226

```java

227

import org.apache.curator.framework.recipes.shared.SharedValue;

228

import org.apache.curator.framework.recipes.shared.SharedValueListener;

229

import com.fasterxml.jackson.databind.ObjectMapper;

230

231

CuratorFramework client = // ... initialize client

232

ObjectMapper mapper = new ObjectMapper();

233

234

// Initialize with JSON configuration

235

Map<String, Object> initialConfig = new HashMap<>();

236

initialConfig.put("maxConnections", 100);

237

initialConfig.put("timeout", 30);

238

byte[] seedData = mapper.writeValueAsBytes(initialConfig);

239

240

SharedValue sharedConfig = new SharedValue(client, "/app/config/database", seedData);

241

242

// Add listener for configuration changes

243

sharedConfig.addListener(new SharedValueListener() {

244

@Override

245

public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {

246

Map<String, Object> config = mapper.readValue(newValue, Map.class);

247

System.out.println("Configuration updated: " + config);

248

// Apply new configuration

249

applyConfiguration(config);

250

}

251

252

@Override

253

public void stateChanged(CuratorFramework client, ConnectionState newState) {

254

System.out.println("Connection state: " + newState);

255

}

256

});

257

258

try {

259

sharedConfig.start();

260

261

// Read current configuration

262

byte[] currentValue = sharedConfig.getValue();

263

Map<String, Object> currentConfig = mapper.readValue(currentValue, Map.class);

264

System.out.println("Current config: " + currentConfig);

265

266

// Update configuration atomically

267

VersionedValue<byte[]> versionedValue = sharedConfig.getVersionedValue();

268

Map<String, Object> newConfig = new HashMap<>(currentConfig);

269

newConfig.put("maxConnections", 200);

270

271

byte[] newData = mapper.writeValueAsBytes(newConfig);

272

boolean success = sharedConfig.trySetValue(versionedValue, newData);

273

274

if (success) {

275

System.out.println("Configuration updated successfully");

276

} else {

277

System.out.println("Configuration was modified by another process");

278

}

279

280

} finally {

281

sharedConfig.close();

282

}

283

```

284

285

### VersionedValue

286

287

Container for values with version information for optimistic locking.

288

289

```java { .api }

290

/**

291

* POJO for holding a value along with its ZooKeeper version

292

*/

293

public class VersionedValue<T> {

294

/**

295

* Create a new VersionedValue

296

* @param value the value

297

* @param version the version number

298

*/

299

public VersionedValue(T value, int version);

300

301

/**

302

* Get the value

303

* @return the value

304

*/

305

public T getValue();

306

307

/**

308

* Get the version

309

* @return version number

310

*/

311

public int getVersion();

312

}

313

```

314

315

### SharedCountReader

316

317

Read-only interface for SharedCount with listener support.

318

319

```java { .api }

320

/**

321

* Interface for reading shared count values and listening to changes

322

*/

323

public interface SharedCountReader {

324

/**

325

* Get the current count

326

* @return current count value

327

*/

328

int getCount();

329

330

/**

331

* Get the versioned count value

332

* @return versioned value

333

*/

334

VersionedValue<Integer> getVersionedValue();

335

336

/**

337

* Add a listener for count changes

338

* @param listener the listener to add

339

*/

340

void addListener(SharedCountListener listener);

341

342

/**

343

* Add listener with executor

344

* @param listener the listener to add

345

* @param executor executor for callbacks

346

*/

347

void addListener(SharedCountListener listener, Executor executor);

348

349

/**

350

* Remove a listener

351

* @param listener the listener to remove

352

*/

353

void removeListener(SharedCountListener listener);

354

}

355

```

356

357

### SharedValueReader

358

359

Read-only interface for SharedValue with listener support.

360

361

```java { .api }

362

/**

363

* Interface for reading shared values and listening to changes

364

*/

365

public interface SharedValueReader {

366

/**

367

* Get the current value

368

* @return current value as byte array

369

*/

370

byte[] getValue();

371

372

/**

373

* Get the versioned value

374

* @return versioned value

375

*/

376

VersionedValue<byte[]> getVersionedValue();

377

378

/**

379

* Add a listener for value changes

380

* @param listener the listener to add

381

*/

382

void addListener(SharedValueListener listener);

383

384

/**

385

* Add listener with executor

386

* @param listener the listener to add

387

* @param executor executor for callbacks

388

*/

389

void addListener(SharedValueListener listener, Executor executor);

390

391

/**

392

* Remove a listener

393

* @param listener the listener to remove

394

*/

395

void removeListener(SharedValueListener listener);

396

}

397

```

398

399

### SharedCountListener

400

401

Listener interface for SharedCount change notifications.

402

403

```java { .api }

404

/**

405

* Listener for SharedCount change notifications

406

*/

407

public interface SharedCountListener extends ConnectionStateListener {

408

/**

409

* Called when the shared count value changes

410

* @param sharedCount the SharedCountReader that changed

411

* @param newCount the new count value

412

* @throws Exception if error occurs in listener

413

*/

414

void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception;

415

}

416

```

417

418

### SharedValueListener

419

420

Listener interface for SharedValue change notifications.

421

422

```java { .api }

423

/**

424

* Listener for SharedValue change notifications

425

*/

426

public interface SharedValueListener extends ConnectionStateListener {

427

/**

428

* Called when the shared value changes

429

* @param sharedValue the SharedValueReader that changed

430

* @param newValue the new value

431

* @throws Exception if error occurs in listener

432

*/

433

void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception;

434

}

435

```

436

437

### IllegalTrySetVersionException

438

439

Exception thrown when version overflow occurs in versioned operations.

440

441

```java { .api }

442

/**

443

* Exception thrown when a version number overflows during trySet operations

444

*/

445

public class IllegalTrySetVersionException extends Exception {

446

/**

447

* Create a new IllegalTrySetVersionException

448

*/

449

public IllegalTrySetVersionException();

450

451

/**

452

* Create with message

453

* @param message exception message

454

*/

455

public IllegalTrySetVersionException(String message);

456

457

/**

458

* Create with cause

459

* @param cause underlying cause

460

*/

461

public IllegalTrySetVersionException(Throwable cause);

462

463

/**

464

* Create with message and cause

465

* @param message exception message

466

* @param cause underlying cause

467

*/

468

public IllegalTrySetVersionException(String message, Throwable cause);

469

}

470

```

471

472

## Common Patterns

473

474

### Configuration Management

475

476

```java

477

public class DistributedConfigManager {

478

private final SharedValue configValue;

479

private final ObjectMapper mapper;

480

private volatile Map<String, Object> currentConfig;

481

482

public DistributedConfigManager(CuratorFramework client, String configPath,

483

Map<String, Object> defaultConfig) throws Exception {

484

this.mapper = new ObjectMapper();

485

byte[] defaultData = mapper.writeValueAsBytes(defaultConfig);

486

487

this.configValue = new SharedValue(client, configPath, defaultData);

488

489

configValue.addListener(new SharedValueListener() {

490

@Override

491

public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {

492

currentConfig = mapper.readValue(newValue, Map.class);

493

onConfigurationChanged(currentConfig);

494

}

495

496

@Override

497

public void stateChanged(CuratorFramework client, ConnectionState newState) {

498

// Handle connection state changes

499

}

500

});

501

502

configValue.start();

503

504

// Initialize current config

505

byte[] currentValue = configValue.getValue();

506

this.currentConfig = mapper.readValue(currentValue, Map.class);

507

}

508

509

public Map<String, Object> getConfiguration() {

510

return new HashMap<>(currentConfig);

511

}

512

513

public boolean updateConfiguration(Map<String, Object> updates) throws Exception {

514

VersionedValue<byte[]> current = configValue.getVersionedValue();

515

516

Map<String, Object> newConfig = new HashMap<>(

517

mapper.readValue(current.getValue(), Map.class)

518

);

519

newConfig.putAll(updates);

520

521

byte[] newData = mapper.writeValueAsBytes(newConfig);

522

return configValue.trySetValue(current, newData);

523

}

524

525

protected void onConfigurationChanged(Map<String, Object> newConfig) {

526

System.out.println("Configuration changed: " + newConfig);

527

// Implement configuration application logic

528

}

529

530

public void close() throws IOException {

531

configValue.close();

532

}

533

}

534

```

535

536

### Distributed Statistics

537

538

```java

539

public class DistributedStatsCollector implements Closeable {

540

private final Map<String, SharedCount> counters;

541

private final CuratorFramework client;

542

private final String basePath;

543

544

public DistributedStatsCollector(CuratorFramework client, String basePath) {

545

this.client = client;

546

this.basePath = basePath;

547

this.counters = new ConcurrentHashMap<>();

548

}

549

550

public void incrementCounter(String counterName) throws Exception {

551

SharedCount counter = getOrCreateCounter(counterName);

552

553

// Retry loop for optimistic updates

554

for (int retry = 0; retry < 5; retry++) {

555

VersionedValue<Integer> current = counter.getVersionedValue();

556

boolean success = counter.trySetCount(current, current.getValue() + 1);

557

558

if (success) {

559

break;

560

}

561

562

// Brief backoff before retry

563

Thread.sleep(10 * (retry + 1));

564

}

565

}

566

567

public void addToCounter(String counterName, int delta) throws Exception {

568

SharedCount counter = getOrCreateCounter(counterName);

569

570

for (int retry = 0; retry < 5; retry++) {

571

VersionedValue<Integer> current = counter.getVersionedValue();

572

boolean success = counter.trySetCount(current, current.getValue() + delta);

573

574

if (success) {

575

break;

576

}

577

578

Thread.sleep(10 * (retry + 1));

579

}

580

}

581

582

public int getCounterValue(String counterName) throws Exception {

583

SharedCount counter = getOrCreateCounter(counterName);

584

return counter.getCount();

585

}

586

587

public Map<String, Integer> getAllCounters() throws Exception {

588

Map<String, Integer> result = new HashMap<>();

589

for (Map.Entry<String, SharedCount> entry : counters.entrySet()) {

590

result.put(entry.getKey(), entry.getValue().getCount());

591

}

592

return result;

593

}

594

595

private SharedCount getOrCreateCounter(String counterName) throws Exception {

596

return counters.computeIfAbsent(counterName, name -> {

597

try {

598

SharedCount counter = new SharedCount(client, basePath + "/" + name, 0);

599

counter.start();

600

return counter;

601

} catch (Exception e) {

602

throw new RuntimeException("Failed to create counter: " + name, e);

603

}

604

});

605

}

606

607

@Override

608

public void close() throws IOException {

609

for (SharedCount counter : counters.values()) {

610

counter.close();

611

}

612

counters.clear();

613

}

614

}

615

```

616

617

### Batch Processing Coordination

618

619

```java

620

public class BatchCoordinator {

621

private final DistributedDoubleBarrier barrier;

622

private final SharedCount progressCounter;

623

private final int expectedParticipants;

624

625

public BatchCoordinator(CuratorFramework client, String jobId, int expectedParticipants)

626

throws Exception {

627

this.expectedParticipants = expectedParticipants;

628

629

this.barrier = new DistributedDoubleBarrier(

630

client,

631

"/jobs/" + jobId + "/barrier",

632

expectedParticipants

633

);

634

635

this.progressCounter = new SharedCount(

636

client,

637

"/jobs/" + jobId + "/progress",

638

0

639

);

640

641

progressCounter.start();

642

}

643

644

public boolean startBatch(long timeoutSeconds) throws Exception {

645

System.out.println("Waiting for all " + expectedParticipants + " participants...");

646

647

boolean allReady = barrier.enter(timeoutSeconds, TimeUnit.SECONDS);

648

if (!allReady) {

649

System.out.println("Not all participants ready within timeout");

650

return false;

651

}

652

653

System.out.println("All participants ready, batch processing started");

654

return true;

655

}

656

657

public void reportProgress() throws Exception {

658

// Atomically increment progress

659

for (int retry = 0; retry < 3; retry++) {

660

VersionedValue<Integer> current = progressCounter.getVersionedValue();

661

boolean success = progressCounter.trySetCount(current, current.getValue() + 1);

662

663

if (success) {

664

int newProgress = current.getValue() + 1;

665

System.out.println("Progress: " + newProgress + "/" + expectedParticipants);

666

break;

667

}

668

}

669

}

670

671

public boolean finishBatch(long timeoutSeconds) throws Exception {

672

System.out.println("Waiting for all participants to complete...");

673

674

boolean allCompleted = barrier.leave(timeoutSeconds, TimeUnit.SECONDS);

675

if (!allCompleted) {

676

System.out.println("Not all participants completed within timeout");

677

return false;

678

}

679

680

System.out.println("All participants completed successfully");

681

return true;

682

}

683

684

public int getCurrentProgress() {

685

return progressCounter.getCount();

686

}

687

}

688

```