Python client library for interacting with Kubernetes clusters through the Kubernetes API
—
Distributed leader election implementation using Kubernetes resources as coordination locks. Enables building highly available distributed applications where only one instance should be active (leader) while others remain on standby (followers).
Core leader election functionality that manages the election and lease renewal process for distributed applications.
class LeaderElection:
def __init__(self, election_config: Config):
"""
Initialize leader election with configuration.
Parameters:
- election_config: Election configuration including lock, timings, and callbacks
"""
def run(self) -> None:
"""
Start the leader election process.
Blocks until leadership is lost or process terminates.
"""
def acquire(self) -> bool:
"""
Attempt to acquire leadership.
Returns:
- bool: True if leadership acquired, False otherwise
"""
def renew_loop(self) -> None:
"""
Continuously renew leadership lease while leader.
Called automatically after acquiring leadership.
"""Configuration for leader election including timing parameters and callback functions.
class Config:
def __init__(
self,
lock,
lease_duration: int,
renew_deadline: int,
retry_period: int,
onstarted_leading: callable,
onstopped_leading: callable = None
):
"""
Configure leader election parameters.
Parameters:
- lock: Resource lock implementation (ConfigMapLock, etc.)
- lease_duration: Duration of leadership lease in seconds
- renew_deadline: Deadline for lease renewal in seconds
- retry_period: Retry interval in seconds
- onstarted_leading: Callback when becoming leader
- onstopped_leading: Callback when losing leadership
Constraints:
- lease_duration > renew_deadline
- renew_deadline > jitter_factor * retry_period
- All timing values must be >= 1
"""Represents the current state of a leadership election stored in the coordination resource.
class LeaderElectionRecord:
def __init__(
self,
holder_identity: str,
lease_duration: int,
acquire_time: str,
renew_time: str
):
"""
Leader election record data.
Parameters:
- holder_identity: Unique identifier of current leader
- lease_duration: Duration of leadership lease
- acquire_time: When leadership was acquired (ISO format)
- renew_time: When lease was last renewed (ISO format)
"""Kubernetes resource-based locks for coordination between election candidates.
class ConfigMapLock:
def __init__(self, name: str, namespace: str, identity: str):
"""
ConfigMap-based resource lock for leader election.
Parameters:
- name: Name of the ConfigMap to use as lock
- namespace: Namespace containing the ConfigMap
- identity: Unique identifier for this candidate
"""
def get(self, name: str, namespace: str) -> tuple:
"""
Get current election record from ConfigMap.
Returns:
- tuple: (success: bool, record: LeaderElectionRecord or exception)
"""
def create(self, name: str, namespace: str, election_record: LeaderElectionRecord) -> bool:
"""
Create new election record in ConfigMap.
Returns:
- bool: True if created successfully
"""
def update(self, election_record: LeaderElectionRecord) -> bool:
"""
Update existing election record.
Returns:
- bool: True if updated successfully
"""from kubernetes import client, config
from kubernetes.leaderelection import LeaderElection
from kubernetes.leaderelection.electionconfig import Config
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
# Load Kubernetes configuration
config.load_kube_config()
def on_started_leading():
"""Called when this instance becomes the leader."""
print("Started leading - performing leader tasks")
# Implement leader-specific logic here
while True:
# Leader work loop
time.sleep(30)
def on_stopped_leading():
"""Called when this instance loses leadership."""
print("Stopped leading - cleaning up")
# Cleanup leader-specific resources
# Create resource lock using ConfigMap
lock = ConfigMapLock(
name="my-app-lock",
namespace="default",
identity="instance-1"
)
# Configure election parameters
election_config = Config(
lock=lock,
lease_duration=30, # Leadership lease duration (seconds)
renew_deadline=20, # Lease renewal deadline (seconds)
retry_period=5, # Retry interval (seconds)
onstarted_leading=on_started_leading,
onstopped_leading=on_stopped_leading
)
# Start leader election
election = LeaderElection(election_config)
election.run() # Blocks until process terminatesimport time
import threading
from kubernetes.leaderelection import LeaderElection
from kubernetes.leaderelection.electionconfig import Config
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
class HighAvailabilityService:
def __init__(self, service_name, namespace, instance_id):
self.service_name = service_name
self.running = False
# Create resource lock
self.lock = ConfigMapLock(
name=f"{service_name}-leader-election",
namespace=namespace,
identity=instance_id
)
# Configure election
self.config = Config(
lock=self.lock,
lease_duration=60,
renew_deadline=40,
retry_period=10,
onstarted_leading=self.become_leader,
onstopped_leading=self.stop_leading
)
def become_leader(self):
"""Start service operations as leader."""
print(f"{self.service_name}: Became leader, starting operations")
self.running = True
# Start background service operations
while self.running:
self.perform_leader_tasks()
time.sleep(10)
def stop_leading(self):
"""Stop service operations when losing leadership."""
print(f"{self.service_name}: Lost leadership, stopping operations")
self.running = False
def perform_leader_tasks(self):
"""Core service logic that should only run on leader."""
print("Performing leader-only tasks...")
# Implement your service logic here
def start(self):
"""Start participating in leader election."""
election = LeaderElection(self.config)
election.run()
# Usage
service = HighAvailabilityService(
service_name="data-processor",
namespace="production",
instance_id="pod-12345"
)
service.start()from kubernetes import leaderelection
from kubernetes.leaderelection import LeaderElection
from kubernetes.leaderelection.electionconfig import Config
from kubernetes.leaderelection.leaderelectionrecord import LeaderElectionRecord
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLockInstall with Tessl CLI
npx tessl i tessl/pypi-kubernetes