0
# Leader Election
1
2
Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination. Enables building resilient controllers and services that require active-passive failover capabilities.
3
4
## Capabilities
5
6
### Leader Election
7
8
Main leader election coordinator that manages the election process and leadership lifecycle.
9
10
```python { .api }
11
class LeaderElection:
12
def __init__(self, election_config):
13
"""
14
Initialize leader election with configuration.
15
16
Parameters:
17
- election_config: ElectionConfig, election configuration object
18
"""
19
20
async def run(self):
21
"""
22
Start the leader election process.
23
24
This method will run indefinitely, attempting to acquire and maintain
25
leadership. Callbacks are invoked when leadership state changes.
26
"""
27
28
async def acquire(self):
29
"""
30
Attempt to acquire leadership.
31
32
Returns:
33
- bool: True if leadership was acquired, False otherwise
34
"""
35
36
async def renew_loop(self):
37
"""
38
Maintain leadership by continuously renewing the lease.
39
40
Runs while this instance holds leadership, automatically renewing
41
the lease at regular intervals.
42
"""
43
44
async def try_acquire_or_renew(self):
45
"""
46
Core election logic for acquiring or renewing leadership.
47
48
Returns:
49
- bool: True if leadership was acquired/renewed, False otherwise
50
"""
51
52
def stop(self):
53
"""Stop the leader election process and release leadership."""
54
```
55
56
### Election Configuration
57
58
Configuration object that defines leader election behavior and callbacks.
59
60
```python { .api }
61
class ElectionConfig:
62
def __init__(self, lock, lease_duration, renew_deadline, retry_period,
63
onstarted_leading=None, onstopped_leading=None):
64
"""
65
Leader election configuration.
66
67
Parameters:
68
- lock: Lock, resource lock implementation (LeaseLock or ConfigMapLock)
69
- lease_duration: int, how long leadership lease lasts (seconds)
70
- renew_deadline: int, deadline for renewing leadership (seconds)
71
- retry_period: int, how often to attempt acquiring leadership (seconds)
72
- onstarted_leading: callable, callback when becoming leader
73
- onstopped_leading: callable, callback when losing leadership
74
"""
75
76
@property
77
def identity(self):
78
"""Unique identity of this election participant."""
79
80
@property
81
def name(self):
82
"""Name of the election (from lock resource name)."""
83
```
84
85
### Leader Election Record
86
87
Metadata stored in the coordination resource to track leadership state.
88
89
```python { .api }
90
class LeaderElectionRecord:
91
def __init__(self, holder_identity, lease_duration_seconds, acquire_time,
92
renew_time, leader_transitions):
93
"""
94
Leadership record metadata.
95
96
Parameters:
97
- holder_identity: str, identity of current leader
98
- lease_duration_seconds: int, lease duration in seconds
99
- acquire_time: datetime, when leadership was acquired
100
- renew_time: datetime, when lease was last renewed
101
- leader_transitions: int, number of leadership changes
102
"""
103
104
def to_dict(self):
105
"""Convert record to dictionary for storage."""
106
107
@classmethod
108
def from_dict(cls, data):
109
"""Create record from dictionary."""
110
```
111
112
### Resource Lock Implementations
113
114
Different Kubernetes resources that can be used for coordination.
115
116
```python { .api }
117
class LeaseLock:
118
def __init__(self, name, namespace, identity):
119
"""
120
Lease-based coordination lock (recommended).
121
122
Uses Kubernetes Lease resources for coordination. Leases are
123
lightweight and designed specifically for coordination.
124
125
Parameters:
126
- name: str, lease resource name
127
- namespace: str, namespace containing the lease
128
- identity: str, unique identity of this participant
129
"""
130
131
async def get(self, client):
132
"""Get current lease record."""
133
134
async def create(self, client, record):
135
"""Create new lease with leadership record."""
136
137
async def update(self, client, record):
138
"""Update existing lease record."""
139
140
def describe(self):
141
"""Human-readable description of this lock."""
142
143
class ConfigMapLock:
144
def __init__(self, name, namespace, identity):
145
"""
146
ConfigMap-based coordination lock (legacy).
147
148
Uses ConfigMap annotations for coordination. Less efficient than
149
Lease locks but compatible with older Kubernetes versions.
150
151
Parameters:
152
- name: str, ConfigMap resource name
153
- namespace: str, namespace containing the ConfigMap
154
- identity: str, unique identity of this participant
155
"""
156
157
async def get(self, client):
158
"""Get current ConfigMap record."""
159
160
async def create(self, client, record):
161
"""Create new ConfigMap with leadership record."""
162
163
async def update(self, client, record):
164
"""Update existing ConfigMap record."""
165
166
def describe(self):
167
"""Human-readable description of this lock."""
168
169
class Lock:
170
"""
171
Base interface for resource lock implementations.
172
173
Custom lock implementations should inherit from this class
174
and implement the required methods.
175
"""
176
177
async def get(self, client):
178
"""Get current lock record."""
179
raise NotImplementedError
180
181
async def create(self, client, record):
182
"""Create new lock with record."""
183
raise NotImplementedError
184
185
async def update(self, client, record):
186
"""Update existing lock record."""
187
raise NotImplementedError
188
189
def describe(self):
190
"""Describe this lock."""
191
raise NotImplementedError
192
```
193
194
## Usage Examples
195
196
### Basic Leader Election
197
198
```python
199
import asyncio
200
import socket
201
from kubernetes_asyncio import client, config
202
from kubernetes_asyncio.leaderelection import leaderelection, electionconfig
203
from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplock
204
205
async def basic_leader_election():
206
await config.load_config()
207
208
# Generate unique identity for this instance
209
identity = f"{socket.gethostname()}-{asyncio.current_task().get_name()}"
210
211
# Create lease lock for coordination
212
lock = leaselock.LeaseLock(
213
name="my-app-leader",
214
namespace="default",
215
identity=identity
216
)
217
218
# Leadership callbacks
219
async def on_started_leading():
220
print(f"{identity}: I am now the leader!")
221
# Start leader-only work here
222
while True:
223
print(f"{identity}: Doing leader work...")
224
await asyncio.sleep(5)
225
226
def on_stopped_leading():
227
print(f"{identity}: I am no longer the leader")
228
# Stop leader-only work here
229
230
# Configure election
231
election_config = electionconfig.ElectionConfig(
232
lock=lock,
233
lease_duration=30, # Hold leadership for 30 seconds max
234
renew_deadline=20, # Must renew within 20 seconds
235
retry_period=5, # Check for leadership every 5 seconds
236
onstarted_leading=on_started_leading,
237
onstopped_leading=on_stopped_leading
238
)
239
240
# Start election
241
election = leaderelection.LeaderElection(election_config)
242
243
try:
244
await election.run()
245
except KeyboardInterrupt:
246
print(f"{identity}: Shutting down...")
247
election.stop()
248
249
# Run multiple instances to see election in action
250
asyncio.run(basic_leader_election())
251
```
252
253
### High Availability Controller
254
255
```python
256
async def ha_controller():
257
await config.load_config()
258
259
identity = f"controller-{socket.gethostname()}-{os.getpid()}"
260
261
# Use ConfigMap lock for this example
262
lock = leaderelection.ConfigMapLock(
263
name="ha-controller-lock",
264
namespace="kube-system",
265
identity=identity
266
)
267
268
# Controller state
269
controller_running = False
270
controller_task = None
271
272
async def start_controller():
273
"""Start the actual controller logic."""
274
nonlocal controller_running, controller_task
275
276
print(f"{identity}: Starting controller")
277
controller_running = True
278
279
# Example controller logic
280
async def controller_loop():
281
v1 = client.CoreV1Api()
282
283
try:
284
while controller_running:
285
# Example: Monitor and manage pods
286
pods = await v1.list_pod_for_all_namespaces(
287
label_selector="managed-by=ha-controller"
288
)
289
290
print(f"{identity}: Managing {len(pods.items)} pods")
291
292
# Perform controller logic here
293
for pod in pods.items:
294
if pod.status.phase == "Failed":
295
print(f"{identity}: Cleaning up failed pod {pod.metadata.name}")
296
# Implement cleanup logic
297
298
await asyncio.sleep(10)
299
300
except asyncio.CancelledError:
301
print(f"{identity}: Controller loop cancelled")
302
finally:
303
await v1.api_client.close()
304
305
controller_task = asyncio.create_task(controller_loop())
306
307
def stop_controller():
308
"""Stop the controller logic."""
309
nonlocal controller_running, controller_task
310
311
print(f"{identity}: Stopping controller")
312
controller_running = False
313
314
if controller_task and not controller_task.done():
315
controller_task.cancel()
316
317
# Election configuration with tighter timing for responsiveness
318
election_config = leaderelection.ElectionConfig(
319
lock=lock,
320
lease_duration=15, # Shorter lease for faster failover
321
renew_deadline=10, # Renew deadline
322
retry_period=2, # Check frequently
323
onstarted_leading=start_controller,
324
onstopped_leading=stop_controller
325
)
326
327
election = leaderelection.LeaderElection(election_config)
328
329
try:
330
print(f"{identity}: Starting leader election")
331
await election.run()
332
except KeyboardInterrupt:
333
print(f"{identity}: Received shutdown signal")
334
finally:
335
election.stop()
336
stop_controller()
337
if controller_task:
338
await controller_task
339
340
asyncio.run(ha_controller())
341
```
342
343
### Multi-Component Leader Election
344
345
```python
346
async def multi_component_system():
347
"""Example of multiple components with separate leader elections."""
348
349
await config.load_config()
350
identity = f"system-{socket.gethostname()}-{os.getpid()}"
351
352
# Component A: Data processor
353
async def data_processor():
354
print(f"{identity}: Starting data processor")
355
while True:
356
print(f"{identity}: Processing data batch...")
357
await asyncio.sleep(8)
358
359
def stop_data_processor():
360
print(f"{identity}: Stopping data processor")
361
362
# Component B: Cleanup service
363
async def cleanup_service():
364
print(f"{identity}: Starting cleanup service")
365
while True:
366
print(f"{identity}: Running cleanup tasks...")
367
await asyncio.sleep(15)
368
369
def stop_cleanup_service():
370
print(f"{identity}: Stopping cleanup service")
371
372
# Separate elections for each component
373
data_processor_lock = leaderelection.LeaseLock(
374
name="data-processor-leader",
375
namespace="default",
376
identity=identity
377
)
378
379
cleanup_service_lock = leaderelection.LeaseLock(
380
name="cleanup-service-leader",
381
namespace="default",
382
identity=identity
383
)
384
385
# Create elections
386
data_processor_election = leaderelection.LeaderElection(
387
leaderelection.ElectionConfig(
388
lock=data_processor_lock,
389
lease_duration=20,
390
renew_deadline=15,
391
retry_period=3,
392
onstarted_leading=data_processor,
393
onstopped_leading=stop_data_processor
394
)
395
)
396
397
cleanup_service_election = leaderelection.LeaderElection(
398
leaderelection.ElectionConfig(
399
lock=cleanup_service_lock,
400
lease_duration=25,
401
renew_deadline=20,
402
retry_period=4,
403
onstarted_leading=cleanup_service,
404
onstopped_leading=stop_cleanup_service
405
)
406
)
407
408
# Run both elections concurrently
409
try:
410
await asyncio.gather(
411
data_processor_election.run(),
412
cleanup_service_election.run()
413
)
414
except KeyboardInterrupt:
415
print(f"{identity}: Shutting down all components")
416
data_processor_election.stop()
417
cleanup_service_election.stop()
418
419
# asyncio.run(multi_component_system()) # Uncomment to run
420
```
421
422
### Custom Lock Implementation
423
424
```python
425
class CustomLock(leaderelection.Lock):
426
"""Example custom lock using a different Kubernetes resource."""
427
428
def __init__(self, name, namespace, identity):
429
self.name = name
430
self.namespace = namespace
431
self.identity = identity
432
433
async def get(self, client):
434
"""Get current lock record from custom resource."""
435
try:
436
# Example: Using a Secret for coordination
437
v1 = client.CoreV1Api()
438
secret = await v1.read_namespaced_secret(
439
name=self.name,
440
namespace=self.namespace
441
)
442
443
# Parse leadership data from secret
444
if 'leader-election' in secret.data:
445
import json, base64
446
data = base64.b64decode(secret.data['leader-election']).decode('utf-8')
447
record_data = json.loads(data)
448
return leaderelection.LeaderElectionRecord.from_dict(record_data)
449
450
except client.ApiException as e:
451
if e.status == 404:
452
return None # Lock doesn't exist yet
453
raise
454
455
return None
456
457
async def create(self, client, record):
458
"""Create new secret with leadership record."""
459
import json, base64
460
461
v1 = client.CoreV1Api()
462
463
secret_data = base64.b64encode(
464
json.dumps(record.to_dict()).encode('utf-8')
465
).decode('utf-8')
466
467
secret = client.V1Secret(
468
metadata=client.V1ObjectMeta(
469
name=self.name,
470
namespace=self.namespace
471
),
472
data={'leader-election': secret_data}
473
)
474
475
await v1.create_namespaced_secret(
476
namespace=self.namespace,
477
body=secret
478
)
479
480
async def update(self, client, record):
481
"""Update existing secret with new record."""
482
import json, base64
483
484
v1 = client.CoreV1Api()
485
486
secret_data = base64.b64encode(
487
json.dumps(record.to_dict()).encode('utf-8')
488
).decode('utf-8')
489
490
# Patch the secret
491
await v1.patch_namespaced_secret(
492
name=self.name,
493
namespace=self.namespace,
494
body={'data': {'leader-election': secret_data}}
495
)
496
497
def describe(self):
498
return f"CustomLock(Secret/{self.namespace}/{self.name})"
499
500
async def custom_lock_example():
501
await config.load_config()
502
503
identity = f"custom-{socket.gethostname()}"
504
505
# Use custom lock implementation
506
custom_lock = CustomLock(
507
name="custom-leader-lock",
508
namespace="default",
509
identity=identity
510
)
511
512
async def leader_work():
513
print(f"{identity}: Leading with custom lock!")
514
while True:
515
print(f"{identity}: Custom leader doing work...")
516
await asyncio.sleep(6)
517
518
def stop_leader_work():
519
print(f"{identity}: Stopped leading with custom lock")
520
521
election_config = leaderelection.ElectionConfig(
522
lock=custom_lock,
523
lease_duration=20,
524
renew_deadline=15,
525
retry_period=3,
526
onstarted_leading=leader_work,
527
onstopped_leading=stop_leader_work
528
)
529
530
election = leaderelection.LeaderElection(election_config)
531
532
try:
533
print(f"{identity}: Starting election with custom lock")
534
await election.run()
535
except KeyboardInterrupt:
536
print(f"{identity}: Shutting down custom lock election")
537
election.stop()
538
539
# asyncio.run(custom_lock_example()) # Uncomment to run
540
```
541
542
### Graceful Shutdown with Leader Election
543
544
```python
545
import signal
546
547
async def graceful_shutdown_example():
548
await config.load_config()
549
550
identity = f"graceful-{socket.gethostname()}"
551
shutdown_event = asyncio.Event()
552
553
# Handle shutdown signals
554
def signal_handler(signum, frame):
555
print(f"{identity}: Received signal {signum}, initiating graceful shutdown")
556
shutdown_event.set()
557
558
signal.signal(signal.SIGINT, signal_handler)
559
signal.signal(signal.SIGTERM, signal_handler)
560
561
lock = leaderelection.LeaseLock(
562
name="graceful-leader",
563
namespace="default",
564
identity=identity
565
)
566
567
leader_task = None
568
569
async def start_leader_work():
570
nonlocal leader_task
571
print(f"{identity}: Becoming leader")
572
573
async def leader_loop():
574
try:
575
while not shutdown_event.is_set():
576
print(f"{identity}: Leader work iteration")
577
578
# Simulate work that can be interrupted
579
try:
580
await asyncio.wait_for(asyncio.sleep(5), timeout=1)
581
except asyncio.TimeoutError:
582
pass # Check shutdown event more frequently
583
584
except asyncio.CancelledError:
585
print(f"{identity}: Leader work cancelled")
586
raise
587
finally:
588
print(f"{identity}: Leader work cleanup completed")
589
590
leader_task = asyncio.create_task(leader_loop())
591
592
def stop_leader_work():
593
nonlocal leader_task
594
print(f"{identity}: Stopping leader work")
595
596
if leader_task and not leader_task.done():
597
leader_task.cancel()
598
599
election_config = leaderelection.ElectionConfig(
600
lock=lock,
601
lease_duration=30,
602
renew_deadline=20,
603
retry_period=2,
604
onstarted_leading=start_leader_work,
605
onstopped_leading=stop_leader_work
606
)
607
608
election = leaderelection.LeaderElection(election_config)
609
610
try:
611
# Run election until shutdown signal
612
election_task = asyncio.create_task(election.run())
613
614
# Wait for either election completion or shutdown signal
615
await asyncio.gather(
616
election_task,
617
shutdown_event.wait(),
618
return_when=asyncio.FIRST_COMPLETED
619
)
620
621
finally:
622
print(f"{identity}: Cleaning up...")
623
election.stop()
624
625
if leader_task and not leader_task.done():
626
leader_task.cancel()
627
try:
628
await leader_task
629
except asyncio.CancelledError:
630
pass
631
632
print(f"{identity}: Graceful shutdown completed")
633
634
asyncio.run(graceful_shutdown_example())
635
```