0
# Leader Election
1
2
Distributed leader election implementation using Kubernetes resources as coordination locks. Enables building highly available distributed applications where only one instance should be active (leader) while others remain on standby (followers).
3
4
## Capabilities
5
6
### Leader Election Process
7
8
Core leader election functionality that manages the election and lease renewal process for distributed applications.
9
10
```python { .api }
11
class LeaderElection:
12
def __init__(self, election_config: Config):
13
"""
14
Initialize leader election with configuration.
15
16
Parameters:
17
- election_config: Election configuration including lock, timings, and callbacks
18
"""
19
20
def run(self) -> None:
21
"""
22
Start the leader election process.
23
Blocks until leadership is lost or process terminates.
24
"""
25
26
def acquire(self) -> bool:
27
"""
28
Attempt to acquire leadership.
29
30
Returns:
31
- bool: True if leadership acquired, False otherwise
32
"""
33
34
def renew_loop(self) -> None:
35
"""
36
Continuously renew leadership lease while leader.
37
Called automatically after acquiring leadership.
38
"""
39
```
40
41
### Election Configuration
42
43
Configuration for leader election including timing parameters and callback functions.
44
45
```python { .api }
46
class Config:
47
def __init__(
48
self,
49
lock,
50
lease_duration: int,
51
renew_deadline: int,
52
retry_period: int,
53
onstarted_leading: callable,
54
onstopped_leading: callable = None
55
):
56
"""
57
Configure leader election parameters.
58
59
Parameters:
60
- lock: Resource lock implementation (ConfigMapLock, etc.)
61
- lease_duration: Duration of leadership lease in seconds
62
- renew_deadline: Deadline for lease renewal in seconds
63
- retry_period: Retry interval in seconds
64
- onstarted_leading: Callback when becoming leader
65
- onstopped_leading: Callback when losing leadership
66
67
Constraints:
68
- lease_duration > renew_deadline
69
- renew_deadline > jitter_factor * retry_period
70
- All timing values must be >= 1
71
"""
72
```
73
74
### Election Record
75
76
Represents the current state of a leadership election stored in the coordination resource.
77
78
```python { .api }
79
class LeaderElectionRecord:
80
def __init__(
81
self,
82
holder_identity: str,
83
lease_duration: int,
84
acquire_time: str,
85
renew_time: str
86
):
87
"""
88
Leader election record data.
89
90
Parameters:
91
- holder_identity: Unique identifier of current leader
92
- lease_duration: Duration of leadership lease
93
- acquire_time: When leadership was acquired (ISO format)
94
- renew_time: When lease was last renewed (ISO format)
95
"""
96
```
97
98
### Resource Locks
99
100
Kubernetes resource-based locks for coordination between election candidates.
101
102
```python { .api }
103
class ConfigMapLock:
104
def __init__(self, name: str, namespace: str, identity: str):
105
"""
106
ConfigMap-based resource lock for leader election.
107
108
Parameters:
109
- name: Name of the ConfigMap to use as lock
110
- namespace: Namespace containing the ConfigMap
111
- identity: Unique identifier for this candidate
112
"""
113
114
def get(self, name: str, namespace: str) -> tuple:
115
"""
116
Get current election record from ConfigMap.
117
118
Returns:
119
- tuple: (success: bool, record: LeaderElectionRecord or exception)
120
"""
121
122
def create(self, name: str, namespace: str, election_record: LeaderElectionRecord) -> bool:
123
"""
124
Create new election record in ConfigMap.
125
126
Returns:
127
- bool: True if created successfully
128
"""
129
130
def update(self, election_record: LeaderElectionRecord) -> bool:
131
"""
132
Update existing election record.
133
134
Returns:
135
- bool: True if updated successfully
136
"""
137
```
138
139
## Usage Examples
140
141
### Basic Leader Election
142
143
```python
144
from kubernetes import client, config
145
from kubernetes.leaderelection import LeaderElection
146
from kubernetes.leaderelection.electionconfig import Config
147
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
148
149
# Load Kubernetes configuration
150
config.load_kube_config()
151
152
def on_started_leading():
153
"""Called when this instance becomes the leader."""
154
print("Started leading - performing leader tasks")
155
# Implement leader-specific logic here
156
while True:
157
# Leader work loop
158
time.sleep(30)
159
160
def on_stopped_leading():
161
"""Called when this instance loses leadership."""
162
print("Stopped leading - cleaning up")
163
# Cleanup leader-specific resources
164
165
# Create resource lock using ConfigMap
166
lock = ConfigMapLock(
167
name="my-app-lock",
168
namespace="default",
169
identity="instance-1"
170
)
171
172
# Configure election parameters
173
election_config = Config(
174
lock=lock,
175
lease_duration=30, # Leadership lease duration (seconds)
176
renew_deadline=20, # Lease renewal deadline (seconds)
177
retry_period=5, # Retry interval (seconds)
178
onstarted_leading=on_started_leading,
179
onstopped_leading=on_stopped_leading
180
)
181
182
# Start leader election
183
election = LeaderElection(election_config)
184
election.run() # Blocks until process terminates
185
```
186
187
### High Availability Service
188
189
```python
190
import time
191
import threading
192
from kubernetes.leaderelection import LeaderElection
193
from kubernetes.leaderelection.electionconfig import Config
194
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
195
196
class HighAvailabilityService:
197
def __init__(self, service_name, namespace, instance_id):
198
self.service_name = service_name
199
self.running = False
200
201
# Create resource lock
202
self.lock = ConfigMapLock(
203
name=f"{service_name}-leader-election",
204
namespace=namespace,
205
identity=instance_id
206
)
207
208
# Configure election
209
self.config = Config(
210
lock=self.lock,
211
lease_duration=60,
212
renew_deadline=40,
213
retry_period=10,
214
onstarted_leading=self.become_leader,
215
onstopped_leading=self.stop_leading
216
)
217
218
def become_leader(self):
219
"""Start service operations as leader."""
220
print(f"{self.service_name}: Became leader, starting operations")
221
self.running = True
222
223
# Start background service operations
224
while self.running:
225
self.perform_leader_tasks()
226
time.sleep(10)
227
228
def stop_leading(self):
229
"""Stop service operations when losing leadership."""
230
print(f"{self.service_name}: Lost leadership, stopping operations")
231
self.running = False
232
233
def perform_leader_tasks(self):
234
"""Core service logic that should only run on leader."""
235
print("Performing leader-only tasks...")
236
# Implement your service logic here
237
238
def start(self):
239
"""Start participating in leader election."""
240
election = LeaderElection(self.config)
241
election.run()
242
243
# Usage
244
service = HighAvailabilityService(
245
service_name="data-processor",
246
namespace="production",
247
instance_id="pod-12345"
248
)
249
service.start()
250
```
251
252
## Import Statements
253
254
```python
255
from kubernetes import leaderelection
256
from kubernetes.leaderelection import LeaderElection
257
from kubernetes.leaderelection.electionconfig import Config
258
from kubernetes.leaderelection.leaderelectionrecord import LeaderElectionRecord
259
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
260
```