Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
—
Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination. Enables building resilient controllers and services that require active-passive failover capabilities.
Main leader election coordinator that manages the election process and leadership lifecycle.
class LeaderElection:
def __init__(self, election_config):
"""
Initialize leader election with configuration.
Parameters:
- election_config: ElectionConfig, election configuration object
"""
async def run(self):
"""
Start the leader election process.
This method will run indefinitely, attempting to acquire and maintain
leadership. Callbacks are invoked when leadership state changes.
"""
async def acquire(self):
"""
Attempt to acquire leadership.
Returns:
- bool: True if leadership was acquired, False otherwise
"""
async def renew_loop(self):
"""
Maintain leadership by continuously renewing the lease.
Runs while this instance holds leadership, automatically renewing
the lease at regular intervals.
"""
async def try_acquire_or_renew(self):
"""
Core election logic for acquiring or renewing leadership.
Returns:
- bool: True if leadership was acquired/renewed, False otherwise
"""
def stop(self):
"""Stop the leader election process and release leadership."""Configuration object that defines leader election behavior and callbacks.
class ElectionConfig:
def __init__(self, lock, lease_duration, renew_deadline, retry_period,
onstarted_leading=None, onstopped_leading=None):
"""
Leader election configuration.
Parameters:
- lock: Lock, resource lock implementation (LeaseLock or ConfigMapLock)
- lease_duration: int, how long leadership lease lasts (seconds)
- renew_deadline: int, deadline for renewing leadership (seconds)
- retry_period: int, how often to attempt acquiring leadership (seconds)
- onstarted_leading: callable, callback when becoming leader
- onstopped_leading: callable, callback when losing leadership
"""
@property
def identity(self):
"""Unique identity of this election participant."""
@property
def name(self):
"""Name of the election (from lock resource name)."""Metadata stored in the coordination resource to track leadership state.
class LeaderElectionRecord:
def __init__(self, holder_identity, lease_duration_seconds, acquire_time,
renew_time, leader_transitions):
"""
Leadership record metadata.
Parameters:
- holder_identity: str, identity of current leader
- lease_duration_seconds: int, lease duration in seconds
- acquire_time: datetime, when leadership was acquired
- renew_time: datetime, when lease was last renewed
- leader_transitions: int, number of leadership changes
"""
def to_dict(self):
"""Convert record to dictionary for storage."""
@classmethod
def from_dict(cls, data):
"""Create record from dictionary."""Different Kubernetes resources that can be used for coordination.
class LeaseLock:
def __init__(self, name, namespace, identity):
"""
Lease-based coordination lock (recommended).
Uses Kubernetes Lease resources for coordination. Leases are
lightweight and designed specifically for coordination.
Parameters:
- name: str, lease resource name
- namespace: str, namespace containing the lease
- identity: str, unique identity of this participant
"""
async def get(self, client):
"""Get current lease record."""
async def create(self, client, record):
"""Create new lease with leadership record."""
async def update(self, client, record):
"""Update existing lease record."""
def describe(self):
"""Human-readable description of this lock."""
class ConfigMapLock:
def __init__(self, name, namespace, identity):
"""
ConfigMap-based coordination lock (legacy).
Uses ConfigMap annotations for coordination. Less efficient than
Lease locks but compatible with older Kubernetes versions.
Parameters:
- name: str, ConfigMap resource name
- namespace: str, namespace containing the ConfigMap
- identity: str, unique identity of this participant
"""
async def get(self, client):
"""Get current ConfigMap record."""
async def create(self, client, record):
"""Create new ConfigMap with leadership record."""
async def update(self, client, record):
"""Update existing ConfigMap record."""
def describe(self):
"""Human-readable description of this lock."""
class Lock:
"""
Base interface for resource lock implementations.
Custom lock implementations should inherit from this class
and implement the required methods.
"""
async def get(self, client):
"""Get current lock record."""
raise NotImplementedError
async def create(self, client, record):
"""Create new lock with record."""
raise NotImplementedError
async def update(self, client, record):
"""Update existing lock record."""
raise NotImplementedError
def describe(self):
"""Describe this lock."""
raise NotImplementedErrorimport asyncio
import socket
from kubernetes_asyncio import client, config
from kubernetes_asyncio.leaderelection import leaderelection, electionconfig
from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplock
async def basic_leader_election():
await config.load_config()
# Generate unique identity for this instance
identity = f"{socket.gethostname()}-{asyncio.current_task().get_name()}"
# Create lease lock for coordination
lock = leaselock.LeaseLock(
name="my-app-leader",
namespace="default",
identity=identity
)
# Leadership callbacks
async def on_started_leading():
print(f"{identity}: I am now the leader!")
# Start leader-only work here
while True:
print(f"{identity}: Doing leader work...")
await asyncio.sleep(5)
def on_stopped_leading():
print(f"{identity}: I am no longer the leader")
# Stop leader-only work here
# Configure election
election_config = electionconfig.ElectionConfig(
lock=lock,
lease_duration=30, # Hold leadership for 30 seconds max
renew_deadline=20, # Must renew within 20 seconds
retry_period=5, # Check for leadership every 5 seconds
onstarted_leading=on_started_leading,
onstopped_leading=on_stopped_leading
)
# Start election
election = leaderelection.LeaderElection(election_config)
try:
await election.run()
except KeyboardInterrupt:
print(f"{identity}: Shutting down...")
election.stop()
# Run multiple instances to see election in action
asyncio.run(basic_leader_election())async def ha_controller():
await config.load_config()
identity = f"controller-{socket.gethostname()}-{os.getpid()}"
# Use ConfigMap lock for this example
lock = leaderelection.ConfigMapLock(
name="ha-controller-lock",
namespace="kube-system",
identity=identity
)
# Controller state
controller_running = False
controller_task = None
async def start_controller():
"""Start the actual controller logic."""
nonlocal controller_running, controller_task
print(f"{identity}: Starting controller")
controller_running = True
# Example controller logic
async def controller_loop():
v1 = client.CoreV1Api()
try:
while controller_running:
# Example: Monitor and manage pods
pods = await v1.list_pod_for_all_namespaces(
label_selector="managed-by=ha-controller"
)
print(f"{identity}: Managing {len(pods.items)} pods")
# Perform controller logic here
for pod in pods.items:
if pod.status.phase == "Failed":
print(f"{identity}: Cleaning up failed pod {pod.metadata.name}")
# Implement cleanup logic
await asyncio.sleep(10)
except asyncio.CancelledError:
print(f"{identity}: Controller loop cancelled")
finally:
await v1.api_client.close()
controller_task = asyncio.create_task(controller_loop())
def stop_controller():
"""Stop the controller logic."""
nonlocal controller_running, controller_task
print(f"{identity}: Stopping controller")
controller_running = False
if controller_task and not controller_task.done():
controller_task.cancel()
# Election configuration with tighter timing for responsiveness
election_config = leaderelection.ElectionConfig(
lock=lock,
lease_duration=15, # Shorter lease for faster failover
renew_deadline=10, # Renew deadline
retry_period=2, # Check frequently
onstarted_leading=start_controller,
onstopped_leading=stop_controller
)
election = leaderelection.LeaderElection(election_config)
try:
print(f"{identity}: Starting leader election")
await election.run()
except KeyboardInterrupt:
print(f"{identity}: Received shutdown signal")
finally:
election.stop()
stop_controller()
if controller_task:
await controller_task
asyncio.run(ha_controller())async def multi_component_system():
"""Example of multiple components with separate leader elections."""
await config.load_config()
identity = f"system-{socket.gethostname()}-{os.getpid()}"
# Component A: Data processor
async def data_processor():
print(f"{identity}: Starting data processor")
while True:
print(f"{identity}: Processing data batch...")
await asyncio.sleep(8)
def stop_data_processor():
print(f"{identity}: Stopping data processor")
# Component B: Cleanup service
async def cleanup_service():
print(f"{identity}: Starting cleanup service")
while True:
print(f"{identity}: Running cleanup tasks...")
await asyncio.sleep(15)
def stop_cleanup_service():
print(f"{identity}: Stopping cleanup service")
# Separate elections for each component
data_processor_lock = leaderelection.LeaseLock(
name="data-processor-leader",
namespace="default",
identity=identity
)
cleanup_service_lock = leaderelection.LeaseLock(
name="cleanup-service-leader",
namespace="default",
identity=identity
)
# Create elections
data_processor_election = leaderelection.LeaderElection(
leaderelection.ElectionConfig(
lock=data_processor_lock,
lease_duration=20,
renew_deadline=15,
retry_period=3,
onstarted_leading=data_processor,
onstopped_leading=stop_data_processor
)
)
cleanup_service_election = leaderelection.LeaderElection(
leaderelection.ElectionConfig(
lock=cleanup_service_lock,
lease_duration=25,
renew_deadline=20,
retry_period=4,
onstarted_leading=cleanup_service,
onstopped_leading=stop_cleanup_service
)
)
# Run both elections concurrently
try:
await asyncio.gather(
data_processor_election.run(),
cleanup_service_election.run()
)
except KeyboardInterrupt:
print(f"{identity}: Shutting down all components")
data_processor_election.stop()
cleanup_service_election.stop()
# asyncio.run(multi_component_system()) # Uncomment to runclass CustomLock(leaderelection.Lock):
"""Example custom lock using a different Kubernetes resource."""
def __init__(self, name, namespace, identity):
self.name = name
self.namespace = namespace
self.identity = identity
async def get(self, client):
"""Get current lock record from custom resource."""
try:
# Example: Using a Secret for coordination
v1 = client.CoreV1Api()
secret = await v1.read_namespaced_secret(
name=self.name,
namespace=self.namespace
)
# Parse leadership data from secret
if 'leader-election' in secret.data:
import json, base64
data = base64.b64decode(secret.data['leader-election']).decode('utf-8')
record_data = json.loads(data)
return leaderelection.LeaderElectionRecord.from_dict(record_data)
except client.ApiException as e:
if e.status == 404:
return None # Lock doesn't exist yet
raise
return None
async def create(self, client, record):
"""Create new secret with leadership record."""
import json, base64
v1 = client.CoreV1Api()
secret_data = base64.b64encode(
json.dumps(record.to_dict()).encode('utf-8')
).decode('utf-8')
secret = client.V1Secret(
metadata=client.V1ObjectMeta(
name=self.name,
namespace=self.namespace
),
data={'leader-election': secret_data}
)
await v1.create_namespaced_secret(
namespace=self.namespace,
body=secret
)
async def update(self, client, record):
"""Update existing secret with new record."""
import json, base64
v1 = client.CoreV1Api()
secret_data = base64.b64encode(
json.dumps(record.to_dict()).encode('utf-8')
).decode('utf-8')
# Patch the secret
await v1.patch_namespaced_secret(
name=self.name,
namespace=self.namespace,
body={'data': {'leader-election': secret_data}}
)
def describe(self):
return f"CustomLock(Secret/{self.namespace}/{self.name})"
async def custom_lock_example():
await config.load_config()
identity = f"custom-{socket.gethostname()}"
# Use custom lock implementation
custom_lock = CustomLock(
name="custom-leader-lock",
namespace="default",
identity=identity
)
async def leader_work():
print(f"{identity}: Leading with custom lock!")
while True:
print(f"{identity}: Custom leader doing work...")
await asyncio.sleep(6)
def stop_leader_work():
print(f"{identity}: Stopped leading with custom lock")
election_config = leaderelection.ElectionConfig(
lock=custom_lock,
lease_duration=20,
renew_deadline=15,
retry_period=3,
onstarted_leading=leader_work,
onstopped_leading=stop_leader_work
)
election = leaderelection.LeaderElection(election_config)
try:
print(f"{identity}: Starting election with custom lock")
await election.run()
except KeyboardInterrupt:
print(f"{identity}: Shutting down custom lock election")
election.stop()
# asyncio.run(custom_lock_example()) # Uncomment to runimport signal
async def graceful_shutdown_example():
await config.load_config()
identity = f"graceful-{socket.gethostname()}"
shutdown_event = asyncio.Event()
# Handle shutdown signals
def signal_handler(signum, frame):
print(f"{identity}: Received signal {signum}, initiating graceful shutdown")
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
lock = leaderelection.LeaseLock(
name="graceful-leader",
namespace="default",
identity=identity
)
leader_task = None
async def start_leader_work():
nonlocal leader_task
print(f"{identity}: Becoming leader")
async def leader_loop():
try:
while not shutdown_event.is_set():
print(f"{identity}: Leader work iteration")
# Simulate work that can be interrupted
try:
await asyncio.wait_for(asyncio.sleep(5), timeout=1)
except asyncio.TimeoutError:
pass # Check shutdown event more frequently
except asyncio.CancelledError:
print(f"{identity}: Leader work cancelled")
raise
finally:
print(f"{identity}: Leader work cleanup completed")
leader_task = asyncio.create_task(leader_loop())
def stop_leader_work():
nonlocal leader_task
print(f"{identity}: Stopping leader work")
if leader_task and not leader_task.done():
leader_task.cancel()
election_config = leaderelection.ElectionConfig(
lock=lock,
lease_duration=30,
renew_deadline=20,
retry_period=2,
onstarted_leading=start_leader_work,
onstopped_leading=stop_leader_work
)
election = leaderelection.LeaderElection(election_config)
try:
# Run election until shutdown signal
election_task = asyncio.create_task(election.run())
# Wait for either election completion or shutdown signal
await asyncio.gather(
election_task,
shutdown_event.wait(),
return_when=asyncio.FIRST_COMPLETED
)
finally:
print(f"{identity}: Cleaning up...")
election.stop()
if leader_task and not leader_task.done():
leader_task.cancel()
try:
await leader_task
except asyncio.CancelledError:
pass
print(f"{identity}: Graceful shutdown completed")
asyncio.run(graceful_shutdown_example())Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes-asyncio