0
# Leader Election
1
2
Leader election capabilities for coordinating leadership across multiple processes in distributed systems. Provides two different approaches: LeaderLatch for simple leader selection and LeaderSelector for more complex leadership management with listener-based control.
3
4
## Capabilities
5
6
### LeaderLatch
7
8
Simple leader election mechanism where participating processes compete to become the leader. Uses a latch-based approach with automatic failover.
9
10
```java { .api }
11
/**
12
* Leader selection abstraction that handles connection state and provides
13
* reliable leader election across multiple processes
14
*/
15
public class LeaderLatch implements Closeable {
16
/**
17
* Create a new LeaderLatch
18
* @param client the curator client
19
* @param latchPath the path to use for leader election
20
*/
21
public LeaderLatch(CuratorFramework client, String latchPath);
22
23
/**
24
* Create a new LeaderLatch with participant ID
25
* @param client the curator client
26
* @param latchPath the path to use for leader election
27
* @param id unique identifier for this participant
28
*/
29
public LeaderLatch(CuratorFramework client, String latchPath, String id);
30
31
/**
32
* Create a new LeaderLatch with close mode
33
* @param client the curator client
34
* @param latchPath the path to use for leader election
35
* @param id unique identifier for this participant
36
* @param closeMode how to handle listeners on close
37
*/
38
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode);
39
40
/**
41
* Start the leader selection process
42
*/
43
public void start() throws Exception;
44
45
/**
46
* Close the latch and relinquish leadership
47
*/
48
public void close() throws IOException;
49
50
/**
51
* Check if this instance is currently the leader
52
* @return true if this instance is the leader
53
*/
54
public boolean hasLeadership();
55
56
/**
57
* Wait until this instance becomes the leader
58
*/
59
public void await() throws InterruptedException, EOFException;
60
61
/**
62
* Wait until this instance becomes the leader or timeout
63
* @param timeout maximum time to wait
64
* @param unit time unit
65
* @return true if became leader, false if timed out
66
*/
67
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
68
69
/**
70
* Get the current state of this latch
71
* @return current state
72
*/
73
public State getState();
74
75
/**
76
* Get the ID of this participant
77
* @return participant ID
78
*/
79
public String getId();
80
81
/**
82
* Get all current participants in the election
83
* @return collection of all participants
84
*/
85
public Collection<Participant> getParticipants() throws Exception;
86
87
/**
88
* Get the current leader participant
89
* @return leader participant, or null if no leader
90
*/
91
public Participant getLeader() throws Exception;
92
93
/**
94
* Add a listener for latch state changes
95
* @param listener the listener to add
96
*/
97
public void addListener(LeaderLatchListener listener);
98
99
/**
100
* Add a listener with executor
101
* @param listener the listener to add
102
* @param executor executor for listener callbacks
103
*/
104
public void addListener(LeaderLatchListener listener, Executor executor);
105
106
/**
107
* Remove a listener
108
* @param listener the listener to remove
109
*/
110
public void removeListener(LeaderLatchListener listener);
111
112
/**
113
* Enumeration of possible latch states
114
*/
115
public enum State {
116
LATENT, // Not started
117
STARTED, // Started but not leader
118
CLOSED // Closed
119
}
120
121
/**
122
* Enumeration controlling listener handling on close
123
*/
124
public enum CloseMode {
125
SILENT, // Don't notify listeners on close
126
NOTIFY_LEADER // Notify listeners on close
127
}
128
}
129
```
130
131
**Usage Example:**
132
133
```java
134
import org.apache.curator.framework.CuratorFramework;
135
import org.apache.curator.framework.recipes.leader.LeaderLatch;
136
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
137
138
CuratorFramework client = // ... initialize client
139
LeaderLatch leaderLatch = new LeaderLatch(client, "/app/leader", "server-1");
140
141
// Add listener for leadership changes
142
leaderLatch.addListener(new LeaderLatchListener() {
143
@Override
144
public void isLeader() {
145
System.out.println("I am now the leader!");
146
// Start leader-specific tasks
147
}
148
149
@Override
150
public void notLeader() {
151
System.out.println("I am no longer the leader");
152
// Stop leader-specific tasks
153
}
154
});
155
156
try {
157
leaderLatch.start();
158
159
// Wait to become leader (optional)
160
if (leaderLatch.await(30, TimeUnit.SECONDS)) {
161
System.out.println("Became leader within 30 seconds");
162
}
163
164
// Check leadership status
165
if (leaderLatch.hasLeadership()) {
166
System.out.println("Currently the leader");
167
}
168
169
// Keep running...
170
Thread.sleep(60000);
171
172
} finally {
173
leaderLatch.close();
174
}
175
```
176
177
### LeaderSelector
178
179
Alternative leader election implementation providing more control over leadership lifecycle through listener callbacks.
180
181
```java { .api }
182
/**
183
* Alternative leader selection implementation with listener-based control
184
*/
185
public class LeaderSelector implements Closeable {
186
/**
187
* Create a new LeaderSelector
188
* @param client the curator client
189
* @param mutexPath the path to use for leader election
190
* @param listener listener to handle leadership changes
191
*/
192
public LeaderSelector(CuratorFramework client, String mutexPath, LeaderSelectorListener listener);
193
194
/**
195
* Create a new LeaderSelector with participant ID
196
* @param client the curator client
197
* @param mutexPath the path to use for leader election
198
* @param threadFactory factory for creating threads
199
* @param executor executor for running leader logic
200
* @param listener listener to handle leadership changes
201
*/
202
public LeaderSelector(CuratorFramework client, String mutexPath,
203
ThreadFactory threadFactory, Executor executor,
204
LeaderSelectorListener listener);
205
206
/**
207
* Start the leader selection process
208
*/
209
public void start();
210
211
/**
212
* Start with immediate participation
213
*/
214
public void autoRequeue();
215
216
/**
217
* Close the selector and relinquish leadership
218
*/
219
public void close() throws IOException;
220
221
/**
222
* Check if this instance has leadership
223
* @return true if currently the leader
224
*/
225
public boolean hasLeadership();
226
227
/**
228
* Requeue this instance for leadership selection
229
*/
230
public void requeue();
231
232
/**
233
* Get the participant ID for this selector
234
* @return participant ID
235
*/
236
public String getId();
237
238
/**
239
* Set the participant ID
240
* @param id the participant ID to set
241
*/
242
public void setId(String id);
243
244
/**
245
* Get all current participants
246
* @return collection of participants
247
*/
248
public Collection<Participant> getParticipants() throws Exception;
249
250
/**
251
* Interrupt current leadership (if this instance is leader)
252
*/
253
public void interruptLeadership();
254
}
255
```
256
257
**Usage Example:**
258
259
```java
260
import org.apache.curator.framework.recipes.leader.LeaderSelector;
261
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
262
import org.apache.curator.framework.state.ConnectionState;
263
264
CuratorFramework client = // ... initialize client
265
266
LeaderSelector leaderSelector = new LeaderSelector(client, "/app/leader",
267
new LeaderSelectorListenerAdapter() {
268
@Override
269
public void takeLeadership(CuratorFramework client) throws Exception {
270
System.out.println("Taking leadership...");
271
272
try {
273
// This method should not return until leadership is relinquished
274
while (true) {
275
// Perform leader duties
276
System.out.println("Doing leader work...");
277
Thread.sleep(5000);
278
279
// Check if we should give up leadership
280
if (shouldGiveUpLeadership()) {
281
break;
282
}
283
}
284
} finally {
285
System.out.println("Giving up leadership");
286
}
287
}
288
289
@Override
290
public void stateChanged(CuratorFramework client, ConnectionState newState) {
291
super.stateChanged(client, newState);
292
if (newState == ConnectionState.LOST) {
293
// Connection lost - leadership will be automatically relinquished
294
handleConnectionLoss();
295
}
296
}
297
});
298
299
leaderSelector.setId("server-1");
300
leaderSelector.start();
301
leaderSelector.autoRequeue(); // Automatically requeue for leadership
302
303
try {
304
// Keep running...
305
Thread.sleep(300000);
306
} finally {
307
leaderSelector.close();
308
}
309
```
310
311
### Participant
312
313
Represents a participant in leader election, providing information about each process competing for leadership.
314
315
```java { .api }
316
/**
317
* Information about a participant in leader election
318
*/
319
public class Participant {
320
/**
321
* Get the participant's unique ID
322
* @return participant ID
323
*/
324
public String getId();
325
326
/**
327
* Check if this participant is currently the leader
328
* @return true if this is the current leader
329
*/
330
public boolean isLeader();
331
}
332
```
333
334
### LeaderLatchListener
335
336
Listener interface for receiving notifications about LeaderLatch state changes.
337
338
```java { .api }
339
/**
340
* Listener for LeaderLatch state change notifications
341
*/
342
public interface LeaderLatchListener {
343
/**
344
* Called when this instance becomes the leader
345
*/
346
void isLeader();
347
348
/**
349
* Called when this instance loses leadership
350
*/
351
void notLeader();
352
}
353
```
354
355
### LeaderSelectorListener
356
357
Listener interface for LeaderSelector leadership management.
358
359
```java { .api }
360
/**
361
* Listener for LeaderSelector leadership events
362
*/
363
public interface LeaderSelectorListener extends ConnectionStateListener {
364
/**
365
* Called when this instance should take leadership
366
* This method should not return until leadership should be relinquished
367
* @param client the curator client
368
* @throws Exception if an error occurs during leadership
369
*/
370
void takeLeadership(CuratorFramework client) throws Exception;
371
}
372
```
373
374
### LeaderSelectorListenerAdapter
375
376
Recommended base class for LeaderSelectorListener implementations that handles connection state properly.
377
378
```java { .api }
379
/**
380
* Recommended base class for LeaderSelectorListener that handles
381
* connection state changes appropriately
382
*/
383
public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
384
/**
385
* Default implementation that handles standard connection state changes
386
* @param client the curator client
387
* @param newState the new connection state
388
*/
389
@Override
390
public void stateChanged(CuratorFramework client, ConnectionState newState);
391
392
/**
393
* Subclasses must implement this to handle leadership
394
* @param client the curator client
395
*/
396
@Override
397
public abstract void takeLeadership(CuratorFramework client) throws Exception;
398
}
399
```
400
401
### CancelLeadershipException
402
403
Exception that can be thrown to interrupt leadership during connection changes.
404
405
```java { .api }
406
/**
407
* Exception that can be thrown during takeLeadership() to indicate
408
* that leadership should be interrupted due to connection issues
409
*/
410
public class CancelLeadershipException extends Exception {
411
/**
412
* Create a new CancelLeadershipException
413
*/
414
public CancelLeadershipException();
415
416
/**
417
* Create a new CancelLeadershipException with message
418
* @param message exception message
419
*/
420
public CancelLeadershipException(String message);
421
422
/**
423
* Create a new CancelLeadershipException with cause
424
* @param cause the underlying cause
425
*/
426
public CancelLeadershipException(Throwable cause);
427
428
/**
429
* Create a new CancelLeadershipException with message and cause
430
* @param message exception message
431
* @param cause the underlying cause
432
*/
433
public CancelLeadershipException(String message, Throwable cause);
434
}
435
```
436
437
## Common Patterns
438
439
### Graceful Leadership Handoff
440
441
```java
442
LeaderLatch latch = new LeaderLatch(client, "/app/leader", "server-1");
443
AtomicBoolean shouldStop = new AtomicBoolean(false);
444
445
latch.addListener(new LeaderLatchListener() {
446
@Override
447
public void isLeader() {
448
// Start leader tasks
449
startLeaderTasks();
450
}
451
452
@Override
453
public void notLeader() {
454
// Gracefully stop leader tasks
455
shouldStop.set(true);
456
stopLeaderTasks();
457
}
458
});
459
460
latch.start();
461
462
// Graceful shutdown
463
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
464
try {
465
shouldStop.set(true);
466
latch.close();
467
} catch (IOException e) {
468
// Log error
469
}
470
}));
471
```
472
473
### Connection State Handling
474
475
```java
476
LeaderSelector selector = new LeaderSelector(client, "/app/leader",
477
new LeaderSelectorListenerAdapter() {
478
@Override
479
public void takeLeadership(CuratorFramework client) throws Exception {
480
while (!Thread.currentThread().isInterrupted()) {
481
try {
482
// Perform leader duties
483
doLeaderWork();
484
Thread.sleep(1000);
485
} catch (InterruptedException e) {
486
Thread.currentThread().interrupt();
487
break;
488
}
489
}
490
}
491
492
@Override
493
public void stateChanged(CuratorFramework client, ConnectionState newState) {
494
if (newState == ConnectionState.SUSPENDED ||
495
newState == ConnectionState.LOST) {
496
// Interrupt current leadership
497
Thread.currentThread().interrupt();
498
}
499
}
500
});
501
```