0
# Shared Values
1
2
Shared value capabilities for coordinating counters and arbitrary data across multiple processes. Provides thread-safe shared state management with versioning and change notifications.
3
4
## Capabilities
5
6
### SharedCount
7
8
Thread-safe shared integer counter that multiple processes can read and modify atomically.
9
10
```java { .api }
11
/**
12
* Manages a shared integer that can be safely updated by multiple processes
13
*/
14
public class SharedCount implements SharedCountReader, Closeable {
15
/**
16
* Create a new SharedCount
17
* @param client the curator client
18
* @param path the path to use for the shared count
19
* @param seedValue initial value for the count
20
*/
21
public SharedCount(CuratorFramework client, String path, int seedValue);
22
23
/**
24
* Start the shared count management
25
* @throws Exception if startup fails
26
*/
27
public void start() throws Exception;
28
29
/**
30
* Close the shared count
31
*/
32
@Override
33
public void close() throws IOException;
34
35
/**
36
* Get the current count value
37
* @return current count value
38
*/
39
@Override
40
public int getCount();
41
42
/**
43
* Get the current version of the count
44
* @return version number for optimistic locking
45
*/
46
@Override
47
public VersionedValue<Integer> getVersionedValue();
48
49
/**
50
* Add a listener for count changes
51
* @param listener the listener to add
52
*/
53
@Override
54
public void addListener(SharedCountListener listener);
55
56
/**
57
* Add listener with specific executor
58
* @param listener the listener to add
59
* @param executor executor for listener callbacks
60
*/
61
@Override
62
public void addListener(SharedCountListener listener, Executor executor);
63
64
/**
65
* Remove a listener
66
* @param listener the listener to remove
67
*/
68
@Override
69
public void removeListener(SharedCountListener listener);
70
71
/**
72
* Set the count to a new value
73
* @param newCount the new count value
74
* @return true if the set succeeded
75
* @throws Exception if operation fails
76
*/
77
public boolean setCount(int newCount) throws Exception;
78
79
/**
80
* Try to set the count with version checking
81
* @param newCount the new count value
82
* @param expectedVersion expected current version
83
* @return true if the set succeeded
84
* @throws Exception if operation fails
85
*/
86
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
87
}
88
```
89
90
**Usage Example:**
91
92
```java
93
import org.apache.curator.framework.CuratorFramework;
94
import org.apache.curator.framework.recipes.shared.SharedCount;
95
import org.apache.curator.framework.recipes.shared.SharedCountListener;
96
import org.apache.curator.framework.recipes.shared.VersionedValue;
97
import org.apache.curator.framework.state.ConnectionState;
98
99
CuratorFramework client = // ... initialize client
100
SharedCount sharedCounter = new SharedCount(client, "/app/counters/global", 0);
101
102
// Add listener for count changes
103
sharedCounter.addListener(new SharedCountListener() {
104
@Override
105
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
106
System.out.println("Counter changed to: " + newCount);
107
}
108
109
@Override
110
public void stateChanged(CuratorFramework client, ConnectionState newState) {
111
System.out.println("Connection state changed: " + newState);
112
}
113
});
114
115
try {
116
sharedCounter.start();
117
118
// Read current value
119
int currentValue = sharedCounter.getCount();
120
System.out.println("Current counter value: " + currentValue);
121
122
// Increment counter atomically
123
VersionedValue<Integer> versionedValue = sharedCounter.getVersionedValue();
124
boolean success = sharedCounter.trySetCount(versionedValue, versionedValue.getValue() + 1);
125
126
if (success) {
127
System.out.println("Successfully incremented counter");
128
} else {
129
System.out.println("Counter was modified by another process, retry needed");
130
}
131
132
// Force set to specific value
133
sharedCounter.setCount(100);
134
135
} finally {
136
sharedCounter.close();
137
}
138
```
139
140
### SharedValue
141
142
Thread-safe shared arbitrary value that multiple processes can read and modify atomically.
143
144
```java { .api }
145
/**
146
* Manages a shared arbitrary value that can be safely updated by multiple processes
147
*/
148
public class SharedValue implements SharedValueReader, Closeable {
149
/**
150
* Create a new SharedValue
151
* @param client the curator client
152
* @param path the path to use for the shared value
153
* @param seedValue initial value (byte array)
154
*/
155
public SharedValue(CuratorFramework client, String path, byte[] seedValue);
156
157
/**
158
* Start the shared value management
159
* @throws Exception if startup fails
160
*/
161
public void start() throws Exception;
162
163
/**
164
* Close the shared value
165
*/
166
@Override
167
public void close() throws IOException;
168
169
/**
170
* Get the current value
171
* @return current value as byte array
172
*/
173
@Override
174
public byte[] getValue();
175
176
/**
177
* Get the current versioned value
178
* @return versioned value for optimistic locking
179
*/
180
@Override
181
public VersionedValue<byte[]> getVersionedValue();
182
183
/**
184
* Add a listener for value changes
185
* @param listener the listener to add
186
*/
187
@Override
188
public void addListener(SharedValueListener listener);
189
190
/**
191
* Add listener with specific executor
192
* @param listener the listener to add
193
* @param executor executor for listener callbacks
194
*/
195
@Override
196
public void addListener(SharedValueListener listener, Executor executor);
197
198
/**
199
* Remove a listener
200
* @param listener the listener to remove
201
*/
202
@Override
203
public void removeListener(SharedValueListener listener);
204
205
/**
206
* Set the value
207
* @param newValue the new value
208
* @return true if the set succeeded
209
* @throws Exception if operation fails
210
*/
211
public boolean setValue(byte[] newValue) throws Exception;
212
213
/**
214
* Try to set the value with version checking
215
* @param previous the previous versioned value
216
* @param newValue the new value
217
* @return true if the set succeeded
218
* @throws Exception if operation fails
219
*/
220
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception;
221
}
222
```
223
224
**Usage Example:**
225
226
```java
227
import org.apache.curator.framework.recipes.shared.SharedValue;
228
import org.apache.curator.framework.recipes.shared.SharedValueListener;
229
import com.fasterxml.jackson.databind.ObjectMapper;
230
231
CuratorFramework client = // ... initialize client
232
ObjectMapper mapper = new ObjectMapper();
233
234
// Initialize with JSON configuration
235
Map<String, Object> initialConfig = new HashMap<>();
236
initialConfig.put("maxConnections", 100);
237
initialConfig.put("timeout", 30);
238
byte[] seedData = mapper.writeValueAsBytes(initialConfig);
239
240
SharedValue sharedConfig = new SharedValue(client, "/app/config/database", seedData);
241
242
// Add listener for configuration changes
243
sharedConfig.addListener(new SharedValueListener() {
244
@Override
245
public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {
246
Map<String, Object> config = mapper.readValue(newValue, Map.class);
247
System.out.println("Configuration updated: " + config);
248
// Apply new configuration
249
applyConfiguration(config);
250
}
251
252
@Override
253
public void stateChanged(CuratorFramework client, ConnectionState newState) {
254
System.out.println("Connection state: " + newState);
255
}
256
});
257
258
try {
259
sharedConfig.start();
260
261
// Read current configuration
262
byte[] currentValue = sharedConfig.getValue();
263
Map<String, Object> currentConfig = mapper.readValue(currentValue, Map.class);
264
System.out.println("Current config: " + currentConfig);
265
266
// Update configuration atomically
267
VersionedValue<byte[]> versionedValue = sharedConfig.getVersionedValue();
268
Map<String, Object> newConfig = new HashMap<>(currentConfig);
269
newConfig.put("maxConnections", 200);
270
271
byte[] newData = mapper.writeValueAsBytes(newConfig);
272
boolean success = sharedConfig.trySetValue(versionedValue, newData);
273
274
if (success) {
275
System.out.println("Configuration updated successfully");
276
} else {
277
System.out.println("Configuration was modified by another process");
278
}
279
280
} finally {
281
sharedConfig.close();
282
}
283
```
284
285
### VersionedValue
286
287
Container for values with version information for optimistic locking.
288
289
```java { .api }
290
/**
291
* POJO for holding a value along with its ZooKeeper version
292
*/
293
public class VersionedValue<T> {
294
/**
295
* Create a new VersionedValue
296
* @param value the value
297
* @param version the version number
298
*/
299
public VersionedValue(T value, int version);
300
301
/**
302
* Get the value
303
* @return the value
304
*/
305
public T getValue();
306
307
/**
308
* Get the version
309
* @return version number
310
*/
311
public int getVersion();
312
}
313
```
314
315
### SharedCountReader
316
317
Read-only interface for SharedCount with listener support.
318
319
```java { .api }
320
/**
321
* Interface for reading shared count values and listening to changes
322
*/
323
public interface SharedCountReader {
324
/**
325
* Get the current count
326
* @return current count value
327
*/
328
int getCount();
329
330
/**
331
* Get the versioned count value
332
* @return versioned value
333
*/
334
VersionedValue<Integer> getVersionedValue();
335
336
/**
337
* Add a listener for count changes
338
* @param listener the listener to add
339
*/
340
void addListener(SharedCountListener listener);
341
342
/**
343
* Add listener with executor
344
* @param listener the listener to add
345
* @param executor executor for callbacks
346
*/
347
void addListener(SharedCountListener listener, Executor executor);
348
349
/**
350
* Remove a listener
351
* @param listener the listener to remove
352
*/
353
void removeListener(SharedCountListener listener);
354
}
355
```
356
357
### SharedValueReader
358
359
Read-only interface for SharedValue with listener support.
360
361
```java { .api }
362
/**
363
* Interface for reading shared values and listening to changes
364
*/
365
public interface SharedValueReader {
366
/**
367
* Get the current value
368
* @return current value as byte array
369
*/
370
byte[] getValue();
371
372
/**
373
* Get the versioned value
374
* @return versioned value
375
*/
376
VersionedValue<byte[]> getVersionedValue();
377
378
/**
379
* Add a listener for value changes
380
* @param listener the listener to add
381
*/
382
void addListener(SharedValueListener listener);
383
384
/**
385
* Add listener with executor
386
* @param listener the listener to add
387
* @param executor executor for callbacks
388
*/
389
void addListener(SharedValueListener listener, Executor executor);
390
391
/**
392
* Remove a listener
393
* @param listener the listener to remove
394
*/
395
void removeListener(SharedValueListener listener);
396
}
397
```
398
399
### SharedCountListener
400
401
Listener interface for SharedCount change notifications.
402
403
```java { .api }
404
/**
405
* Listener for SharedCount change notifications
406
*/
407
public interface SharedCountListener extends ConnectionStateListener {
408
/**
409
* Called when the shared count value changes
410
* @param sharedCount the SharedCountReader that changed
411
* @param newCount the new count value
412
* @throws Exception if error occurs in listener
413
*/
414
void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception;
415
}
416
```
417
418
### SharedValueListener
419
420
Listener interface for SharedValue change notifications.
421
422
```java { .api }
423
/**
424
* Listener for SharedValue change notifications
425
*/
426
public interface SharedValueListener extends ConnectionStateListener {
427
/**
428
* Called when the shared value changes
429
* @param sharedValue the SharedValueReader that changed
430
* @param newValue the new value
431
* @throws Exception if error occurs in listener
432
*/
433
void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception;
434
}
435
```
436
437
### IllegalTrySetVersionException
438
439
Exception thrown when version overflow occurs in versioned operations.
440
441
```java { .api }
442
/**
443
* Exception thrown when a version number overflows during trySet operations
444
*/
445
public class IllegalTrySetVersionException extends Exception {
446
/**
447
* Create a new IllegalTrySetVersionException
448
*/
449
public IllegalTrySetVersionException();
450
451
/**
452
* Create with message
453
* @param message exception message
454
*/
455
public IllegalTrySetVersionException(String message);
456
457
/**
458
* Create with cause
459
* @param cause underlying cause
460
*/
461
public IllegalTrySetVersionException(Throwable cause);
462
463
/**
464
* Create with message and cause
465
* @param message exception message
466
* @param cause underlying cause
467
*/
468
public IllegalTrySetVersionException(String message, Throwable cause);
469
}
470
```
471
472
## Common Patterns
473
474
### Configuration Management
475
476
```java
477
public class DistributedConfigManager {
478
private final SharedValue configValue;
479
private final ObjectMapper mapper;
480
private volatile Map<String, Object> currentConfig;
481
482
public DistributedConfigManager(CuratorFramework client, String configPath,
483
Map<String, Object> defaultConfig) throws Exception {
484
this.mapper = new ObjectMapper();
485
byte[] defaultData = mapper.writeValueAsBytes(defaultConfig);
486
487
this.configValue = new SharedValue(client, configPath, defaultData);
488
489
configValue.addListener(new SharedValueListener() {
490
@Override
491
public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception {
492
currentConfig = mapper.readValue(newValue, Map.class);
493
onConfigurationChanged(currentConfig);
494
}
495
496
@Override
497
public void stateChanged(CuratorFramework client, ConnectionState newState) {
498
// Handle connection state changes
499
}
500
});
501
502
configValue.start();
503
504
// Initialize current config
505
byte[] currentValue = configValue.getValue();
506
this.currentConfig = mapper.readValue(currentValue, Map.class);
507
}
508
509
public Map<String, Object> getConfiguration() {
510
return new HashMap<>(currentConfig);
511
}
512
513
public boolean updateConfiguration(Map<String, Object> updates) throws Exception {
514
VersionedValue<byte[]> current = configValue.getVersionedValue();
515
516
Map<String, Object> newConfig = new HashMap<>(
517
mapper.readValue(current.getValue(), Map.class)
518
);
519
newConfig.putAll(updates);
520
521
byte[] newData = mapper.writeValueAsBytes(newConfig);
522
return configValue.trySetValue(current, newData);
523
}
524
525
protected void onConfigurationChanged(Map<String, Object> newConfig) {
526
System.out.println("Configuration changed: " + newConfig);
527
// Implement configuration application logic
528
}
529
530
public void close() throws IOException {
531
configValue.close();
532
}
533
}
534
```
535
536
### Distributed Statistics
537
538
```java
539
public class DistributedStatsCollector implements Closeable {
540
private final Map<String, SharedCount> counters;
541
private final CuratorFramework client;
542
private final String basePath;
543
544
public DistributedStatsCollector(CuratorFramework client, String basePath) {
545
this.client = client;
546
this.basePath = basePath;
547
this.counters = new ConcurrentHashMap<>();
548
}
549
550
public void incrementCounter(String counterName) throws Exception {
551
SharedCount counter = getOrCreateCounter(counterName);
552
553
// Retry loop for optimistic updates
554
for (int retry = 0; retry < 5; retry++) {
555
VersionedValue<Integer> current = counter.getVersionedValue();
556
boolean success = counter.trySetCount(current, current.getValue() + 1);
557
558
if (success) {
559
break;
560
}
561
562
// Brief backoff before retry
563
Thread.sleep(10 * (retry + 1));
564
}
565
}
566
567
public void addToCounter(String counterName, int delta) throws Exception {
568
SharedCount counter = getOrCreateCounter(counterName);
569
570
for (int retry = 0; retry < 5; retry++) {
571
VersionedValue<Integer> current = counter.getVersionedValue();
572
boolean success = counter.trySetCount(current, current.getValue() + delta);
573
574
if (success) {
575
break;
576
}
577
578
Thread.sleep(10 * (retry + 1));
579
}
580
}
581
582
public int getCounterValue(String counterName) throws Exception {
583
SharedCount counter = getOrCreateCounter(counterName);
584
return counter.getCount();
585
}
586
587
public Map<String, Integer> getAllCounters() throws Exception {
588
Map<String, Integer> result = new HashMap<>();
589
for (Map.Entry<String, SharedCount> entry : counters.entrySet()) {
590
result.put(entry.getKey(), entry.getValue().getCount());
591
}
592
return result;
593
}
594
595
private SharedCount getOrCreateCounter(String counterName) throws Exception {
596
return counters.computeIfAbsent(counterName, name -> {
597
try {
598
SharedCount counter = new SharedCount(client, basePath + "/" + name, 0);
599
counter.start();
600
return counter;
601
} catch (Exception e) {
602
throw new RuntimeException("Failed to create counter: " + name, e);
603
}
604
});
605
}
606
607
@Override
608
public void close() throws IOException {
609
for (SharedCount counter : counters.values()) {
610
counter.close();
611
}
612
counters.clear();
613
}
614
}
615
```
616
617
### Batch Processing Coordination
618
619
```java
620
public class BatchCoordinator {
621
private final DistributedDoubleBarrier barrier;
622
private final SharedCount progressCounter;
623
private final int expectedParticipants;
624
625
public BatchCoordinator(CuratorFramework client, String jobId, int expectedParticipants)
626
throws Exception {
627
this.expectedParticipants = expectedParticipants;
628
629
this.barrier = new DistributedDoubleBarrier(
630
client,
631
"/jobs/" + jobId + "/barrier",
632
expectedParticipants
633
);
634
635
this.progressCounter = new SharedCount(
636
client,
637
"/jobs/" + jobId + "/progress",
638
0
639
);
640
641
progressCounter.start();
642
}
643
644
public boolean startBatch(long timeoutSeconds) throws Exception {
645
System.out.println("Waiting for all " + expectedParticipants + " participants...");
646
647
boolean allReady = barrier.enter(timeoutSeconds, TimeUnit.SECONDS);
648
if (!allReady) {
649
System.out.println("Not all participants ready within timeout");
650
return false;
651
}
652
653
System.out.println("All participants ready, batch processing started");
654
return true;
655
}
656
657
public void reportProgress() throws Exception {
658
// Atomically increment progress
659
for (int retry = 0; retry < 3; retry++) {
660
VersionedValue<Integer> current = progressCounter.getVersionedValue();
661
boolean success = progressCounter.trySetCount(current, current.getValue() + 1);
662
663
if (success) {
664
int newProgress = current.getValue() + 1;
665
System.out.println("Progress: " + newProgress + "/" + expectedParticipants);
666
break;
667
}
668
}
669
}
670
671
public boolean finishBatch(long timeoutSeconds) throws Exception {
672
System.out.println("Waiting for all participants to complete...");
673
674
boolean allCompleted = barrier.leave(timeoutSeconds, TimeUnit.SECONDS);
675
if (!allCompleted) {
676
System.out.println("Not all participants completed within timeout");
677
return false;
678
}
679
680
System.out.println("All participants completed successfully");
681
return true;
682
}
683
684
public int getCurrentProgress() {
685
return progressCounter.getCount();
686
}
687
}
688
```