or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-apis.mdconfiguration.mddynamic-client.mdindex.mdleader-election.mdstreaming.mdutils.mdwatch.md

leader-election.mddocs/

0

# Leader Election

1

2

Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination. Enables building resilient controllers and services that require active-passive failover capabilities.

3

4

## Capabilities

5

6

### Leader Election

7

8

Main leader election coordinator that manages the election process and leadership lifecycle.

9

10

```python { .api }

11

class LeaderElection:

12

def __init__(self, election_config):

13

"""

14

Initialize leader election with configuration.

15

16

Parameters:

17

- election_config: ElectionConfig, election configuration object

18

"""

19

20

async def run(self):

21

"""

22

Start the leader election process.

23

24

This method will run indefinitely, attempting to acquire and maintain

25

leadership. Callbacks are invoked when leadership state changes.

26

"""

27

28

async def acquire(self):

29

"""

30

Attempt to acquire leadership.

31

32

Returns:

33

- bool: True if leadership was acquired, False otherwise

34

"""

35

36

async def renew_loop(self):

37

"""

38

Maintain leadership by continuously renewing the lease.

39

40

Runs while this instance holds leadership, automatically renewing

41

the lease at regular intervals.

42

"""

43

44

async def try_acquire_or_renew(self):

45

"""

46

Core election logic for acquiring or renewing leadership.

47

48

Returns:

49

- bool: True if leadership was acquired/renewed, False otherwise

50

"""

51

52

def stop(self):

53

"""Stop the leader election process and release leadership."""

54

```

55

56

### Election Configuration

57

58

Configuration object that defines leader election behavior and callbacks.

59

60

```python { .api }

61

class ElectionConfig:

62

def __init__(self, lock, lease_duration, renew_deadline, retry_period,

63

onstarted_leading=None, onstopped_leading=None):

64

"""

65

Leader election configuration.

66

67

Parameters:

68

- lock: Lock, resource lock implementation (LeaseLock or ConfigMapLock)

69

- lease_duration: int, how long leadership lease lasts (seconds)

70

- renew_deadline: int, deadline for renewing leadership (seconds)

71

- retry_period: int, how often to attempt acquiring leadership (seconds)

72

- onstarted_leading: callable, callback when becoming leader

73

- onstopped_leading: callable, callback when losing leadership

74

"""

75

76

@property

77

def identity(self):

78

"""Unique identity of this election participant."""

79

80

@property

81

def name(self):

82

"""Name of the election (from lock resource name)."""

83

```

84

85

### Leader Election Record

86

87

Metadata stored in the coordination resource to track leadership state.

88

89

```python { .api }

90

class LeaderElectionRecord:

91

def __init__(self, holder_identity, lease_duration_seconds, acquire_time,

92

renew_time, leader_transitions):

93

"""

94

Leadership record metadata.

95

96

Parameters:

97

- holder_identity: str, identity of current leader

98

- lease_duration_seconds: int, lease duration in seconds

99

- acquire_time: datetime, when leadership was acquired

100

- renew_time: datetime, when lease was last renewed

101

- leader_transitions: int, number of leadership changes

102

"""

103

104

def to_dict(self):

105

"""Convert record to dictionary for storage."""

106

107

@classmethod

108

def from_dict(cls, data):

109

"""Create record from dictionary."""

110

```

111

112

### Resource Lock Implementations

113

114

Different Kubernetes resources that can be used for coordination.

115

116

```python { .api }

117

class LeaseLock:

118

def __init__(self, name, namespace, identity):

119

"""

120

Lease-based coordination lock (recommended).

121

122

Uses Kubernetes Lease resources for coordination. Leases are

123

lightweight and designed specifically for coordination.

124

125

Parameters:

126

- name: str, lease resource name

127

- namespace: str, namespace containing the lease

128

- identity: str, unique identity of this participant

129

"""

130

131

async def get(self, client):

132

"""Get current lease record."""

133

134

async def create(self, client, record):

135

"""Create new lease with leadership record."""

136

137

async def update(self, client, record):

138

"""Update existing lease record."""

139

140

def describe(self):

141

"""Human-readable description of this lock."""

142

143

class ConfigMapLock:

144

def __init__(self, name, namespace, identity):

145

"""

146

ConfigMap-based coordination lock (legacy).

147

148

Uses ConfigMap annotations for coordination. Less efficient than

149

Lease locks but compatible with older Kubernetes versions.

150

151

Parameters:

152

- name: str, ConfigMap resource name

153

- namespace: str, namespace containing the ConfigMap

154

- identity: str, unique identity of this participant

155

"""

156

157

async def get(self, client):

158

"""Get current ConfigMap record."""

159

160

async def create(self, client, record):

161

"""Create new ConfigMap with leadership record."""

162

163

async def update(self, client, record):

164

"""Update existing ConfigMap record."""

165

166

def describe(self):

167

"""Human-readable description of this lock."""

168

169

class Lock:

170

"""

171

Base interface for resource lock implementations.

172

173

Custom lock implementations should inherit from this class

174

and implement the required methods.

175

"""

176

177

async def get(self, client):

178

"""Get current lock record."""

179

raise NotImplementedError

180

181

async def create(self, client, record):

182

"""Create new lock with record."""

183

raise NotImplementedError

184

185

async def update(self, client, record):

186

"""Update existing lock record."""

187

raise NotImplementedError

188

189

def describe(self):

190

"""Describe this lock."""

191

raise NotImplementedError

192

```

193

194

## Usage Examples

195

196

### Basic Leader Election

197

198

```python

199

import asyncio

200

import socket

201

from kubernetes_asyncio import client, config

202

from kubernetes_asyncio.leaderelection import leaderelection, electionconfig

203

from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplock

204

205

async def basic_leader_election():

206

await config.load_config()

207

208

# Generate unique identity for this instance

209

identity = f"{socket.gethostname()}-{asyncio.current_task().get_name()}"

210

211

# Create lease lock for coordination

212

lock = leaselock.LeaseLock(

213

name="my-app-leader",

214

namespace="default",

215

identity=identity

216

)

217

218

# Leadership callbacks

219

async def on_started_leading():

220

print(f"{identity}: I am now the leader!")

221

# Start leader-only work here

222

while True:

223

print(f"{identity}: Doing leader work...")

224

await asyncio.sleep(5)

225

226

def on_stopped_leading():

227

print(f"{identity}: I am no longer the leader")

228

# Stop leader-only work here

229

230

# Configure election

231

election_config = electionconfig.ElectionConfig(

232

lock=lock,

233

lease_duration=30, # Hold leadership for 30 seconds max

234

renew_deadline=20, # Must renew within 20 seconds

235

retry_period=5, # Check for leadership every 5 seconds

236

onstarted_leading=on_started_leading,

237

onstopped_leading=on_stopped_leading

238

)

239

240

# Start election

241

election = leaderelection.LeaderElection(election_config)

242

243

try:

244

await election.run()

245

except KeyboardInterrupt:

246

print(f"{identity}: Shutting down...")

247

election.stop()

248

249

# Run multiple instances to see election in action

250

asyncio.run(basic_leader_election())

251

```

252

253

### High Availability Controller

254

255

```python

256

async def ha_controller():

257

await config.load_config()

258

259

identity = f"controller-{socket.gethostname()}-{os.getpid()}"

260

261

# Use ConfigMap lock for this example

262

lock = leaderelection.ConfigMapLock(

263

name="ha-controller-lock",

264

namespace="kube-system",

265

identity=identity

266

)

267

268

# Controller state

269

controller_running = False

270

controller_task = None

271

272

async def start_controller():

273

"""Start the actual controller logic."""

274

nonlocal controller_running, controller_task

275

276

print(f"{identity}: Starting controller")

277

controller_running = True

278

279

# Example controller logic

280

async def controller_loop():

281

v1 = client.CoreV1Api()

282

283

try:

284

while controller_running:

285

# Example: Monitor and manage pods

286

pods = await v1.list_pod_for_all_namespaces(

287

label_selector="managed-by=ha-controller"

288

)

289

290

print(f"{identity}: Managing {len(pods.items)} pods")

291

292

# Perform controller logic here

293

for pod in pods.items:

294

if pod.status.phase == "Failed":

295

print(f"{identity}: Cleaning up failed pod {pod.metadata.name}")

296

# Implement cleanup logic

297

298

await asyncio.sleep(10)

299

300

except asyncio.CancelledError:

301

print(f"{identity}: Controller loop cancelled")

302

finally:

303

await v1.api_client.close()

304

305

controller_task = asyncio.create_task(controller_loop())

306

307

def stop_controller():

308

"""Stop the controller logic."""

309

nonlocal controller_running, controller_task

310

311

print(f"{identity}: Stopping controller")

312

controller_running = False

313

314

if controller_task and not controller_task.done():

315

controller_task.cancel()

316

317

# Election configuration with tighter timing for responsiveness

318

election_config = leaderelection.ElectionConfig(

319

lock=lock,

320

lease_duration=15, # Shorter lease for faster failover

321

renew_deadline=10, # Renew deadline

322

retry_period=2, # Check frequently

323

onstarted_leading=start_controller,

324

onstopped_leading=stop_controller

325

)

326

327

election = leaderelection.LeaderElection(election_config)

328

329

try:

330

print(f"{identity}: Starting leader election")

331

await election.run()

332

except KeyboardInterrupt:

333

print(f"{identity}: Received shutdown signal")

334

finally:

335

election.stop()

336

stop_controller()

337

if controller_task:

338

await controller_task

339

340

asyncio.run(ha_controller())

341

```

342

343

### Multi-Component Leader Election

344

345

```python

346

async def multi_component_system():

347

"""Example of multiple components with separate leader elections."""

348

349

await config.load_config()

350

identity = f"system-{socket.gethostname()}-{os.getpid()}"

351

352

# Component A: Data processor

353

async def data_processor():

354

print(f"{identity}: Starting data processor")

355

while True:

356

print(f"{identity}: Processing data batch...")

357

await asyncio.sleep(8)

358

359

def stop_data_processor():

360

print(f"{identity}: Stopping data processor")

361

362

# Component B: Cleanup service

363

async def cleanup_service():

364

print(f"{identity}: Starting cleanup service")

365

while True:

366

print(f"{identity}: Running cleanup tasks...")

367

await asyncio.sleep(15)

368

369

def stop_cleanup_service():

370

print(f"{identity}: Stopping cleanup service")

371

372

# Separate elections for each component

373

data_processor_lock = leaderelection.LeaseLock(

374

name="data-processor-leader",

375

namespace="default",

376

identity=identity

377

)

378

379

cleanup_service_lock = leaderelection.LeaseLock(

380

name="cleanup-service-leader",

381

namespace="default",

382

identity=identity

383

)

384

385

# Create elections

386

data_processor_election = leaderelection.LeaderElection(

387

leaderelection.ElectionConfig(

388

lock=data_processor_lock,

389

lease_duration=20,

390

renew_deadline=15,

391

retry_period=3,

392

onstarted_leading=data_processor,

393

onstopped_leading=stop_data_processor

394

)

395

)

396

397

cleanup_service_election = leaderelection.LeaderElection(

398

leaderelection.ElectionConfig(

399

lock=cleanup_service_lock,

400

lease_duration=25,

401

renew_deadline=20,

402

retry_period=4,

403

onstarted_leading=cleanup_service,

404

onstopped_leading=stop_cleanup_service

405

)

406

)

407

408

# Run both elections concurrently

409

try:

410

await asyncio.gather(

411

data_processor_election.run(),

412

cleanup_service_election.run()

413

)

414

except KeyboardInterrupt:

415

print(f"{identity}: Shutting down all components")

416

data_processor_election.stop()

417

cleanup_service_election.stop()

418

419

# asyncio.run(multi_component_system()) # Uncomment to run

420

```

421

422

### Custom Lock Implementation

423

424

```python

425

class CustomLock(leaderelection.Lock):

426

"""Example custom lock using a different Kubernetes resource."""

427

428

def __init__(self, name, namespace, identity):

429

self.name = name

430

self.namespace = namespace

431

self.identity = identity

432

433

async def get(self, client):

434

"""Get current lock record from custom resource."""

435

try:

436

# Example: Using a Secret for coordination

437

v1 = client.CoreV1Api()

438

secret = await v1.read_namespaced_secret(

439

name=self.name,

440

namespace=self.namespace

441

)

442

443

# Parse leadership data from secret

444

if 'leader-election' in secret.data:

445

import json, base64

446

data = base64.b64decode(secret.data['leader-election']).decode('utf-8')

447

record_data = json.loads(data)

448

return leaderelection.LeaderElectionRecord.from_dict(record_data)

449

450

except client.ApiException as e:

451

if e.status == 404:

452

return None # Lock doesn't exist yet

453

raise

454

455

return None

456

457

async def create(self, client, record):

458

"""Create new secret with leadership record."""

459

import json, base64

460

461

v1 = client.CoreV1Api()

462

463

secret_data = base64.b64encode(

464

json.dumps(record.to_dict()).encode('utf-8')

465

).decode('utf-8')

466

467

secret = client.V1Secret(

468

metadata=client.V1ObjectMeta(

469

name=self.name,

470

namespace=self.namespace

471

),

472

data={'leader-election': secret_data}

473

)

474

475

await v1.create_namespaced_secret(

476

namespace=self.namespace,

477

body=secret

478

)

479

480

async def update(self, client, record):

481

"""Update existing secret with new record."""

482

import json, base64

483

484

v1 = client.CoreV1Api()

485

486

secret_data = base64.b64encode(

487

json.dumps(record.to_dict()).encode('utf-8')

488

).decode('utf-8')

489

490

# Patch the secret

491

await v1.patch_namespaced_secret(

492

name=self.name,

493

namespace=self.namespace,

494

body={'data': {'leader-election': secret_data}}

495

)

496

497

def describe(self):

498

return f"CustomLock(Secret/{self.namespace}/{self.name})"

499

500

async def custom_lock_example():

501

await config.load_config()

502

503

identity = f"custom-{socket.gethostname()}"

504

505

# Use custom lock implementation

506

custom_lock = CustomLock(

507

name="custom-leader-lock",

508

namespace="default",

509

identity=identity

510

)

511

512

async def leader_work():

513

print(f"{identity}: Leading with custom lock!")

514

while True:

515

print(f"{identity}: Custom leader doing work...")

516

await asyncio.sleep(6)

517

518

def stop_leader_work():

519

print(f"{identity}: Stopped leading with custom lock")

520

521

election_config = leaderelection.ElectionConfig(

522

lock=custom_lock,

523

lease_duration=20,

524

renew_deadline=15,

525

retry_period=3,

526

onstarted_leading=leader_work,

527

onstopped_leading=stop_leader_work

528

)

529

530

election = leaderelection.LeaderElection(election_config)

531

532

try:

533

print(f"{identity}: Starting election with custom lock")

534

await election.run()

535

except KeyboardInterrupt:

536

print(f"{identity}: Shutting down custom lock election")

537

election.stop()

538

539

# asyncio.run(custom_lock_example()) # Uncomment to run

540

```

541

542

### Graceful Shutdown with Leader Election

543

544

```python

545

import signal

546

547

async def graceful_shutdown_example():

548

await config.load_config()

549

550

identity = f"graceful-{socket.gethostname()}"

551

shutdown_event = asyncio.Event()

552

553

# Handle shutdown signals

554

def signal_handler(signum, frame):

555

print(f"{identity}: Received signal {signum}, initiating graceful shutdown")

556

shutdown_event.set()

557

558

signal.signal(signal.SIGINT, signal_handler)

559

signal.signal(signal.SIGTERM, signal_handler)

560

561

lock = leaderelection.LeaseLock(

562

name="graceful-leader",

563

namespace="default",

564

identity=identity

565

)

566

567

leader_task = None

568

569

async def start_leader_work():

570

nonlocal leader_task

571

print(f"{identity}: Becoming leader")

572

573

async def leader_loop():

574

try:

575

while not shutdown_event.is_set():

576

print(f"{identity}: Leader work iteration")

577

578

# Simulate work that can be interrupted

579

try:

580

await asyncio.wait_for(asyncio.sleep(5), timeout=1)

581

except asyncio.TimeoutError:

582

pass # Check shutdown event more frequently

583

584

except asyncio.CancelledError:

585

print(f"{identity}: Leader work cancelled")

586

raise

587

finally:

588

print(f"{identity}: Leader work cleanup completed")

589

590

leader_task = asyncio.create_task(leader_loop())

591

592

def stop_leader_work():

593

nonlocal leader_task

594

print(f"{identity}: Stopping leader work")

595

596

if leader_task and not leader_task.done():

597

leader_task.cancel()

598

599

election_config = leaderelection.ElectionConfig(

600

lock=lock,

601

lease_duration=30,

602

renew_deadline=20,

603

retry_period=2,

604

onstarted_leading=start_leader_work,

605

onstopped_leading=stop_leader_work

606

)

607

608

election = leaderelection.LeaderElection(election_config)

609

610

try:

611

# Run election until shutdown signal

612

election_task = asyncio.create_task(election.run())

613

614

# Wait for either election completion or shutdown signal

615

await asyncio.gather(

616

election_task,

617

shutdown_event.wait(),

618

return_when=asyncio.FIRST_COMPLETED

619

)

620

621

finally:

622

print(f"{identity}: Cleaning up...")

623

election.stop()

624

625

if leader_task and not leader_task.done():

626

leader_task.cancel()

627

try:

628

await leader_task

629

except asyncio.CancelledError:

630

pass

631

632

print(f"{identity}: Graceful shutdown completed")

633

634

asyncio.run(graceful_shutdown_example())

635

```