CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes-asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations

Pending
Overview
Eval results
Files

leader-election.mddocs/

Leader Election

Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination. Enables building resilient controllers and services that require active-passive failover capabilities.

Capabilities

Leader Election

Main leader election coordinator that manages the election process and leadership lifecycle.

class LeaderElection:
    def __init__(self, election_config):
        """
        Initialize leader election with configuration.
        
        Parameters:
        - election_config: ElectionConfig, election configuration object
        """
    
    async def run(self):
        """
        Start the leader election process.
        
        This method will run indefinitely, attempting to acquire and maintain
        leadership. Callbacks are invoked when leadership state changes.
        """
    
    async def acquire(self):
        """
        Attempt to acquire leadership.
        
        Returns:
        - bool: True if leadership was acquired, False otherwise
        """
    
    async def renew_loop(self):
        """
        Maintain leadership by continuously renewing the lease.
        
        Runs while this instance holds leadership, automatically renewing
        the lease at regular intervals.
        """
    
    async def try_acquire_or_renew(self):
        """
        Core election logic for acquiring or renewing leadership.
        
        Returns:
        - bool: True if leadership was acquired/renewed, False otherwise
        """
    
    def stop(self):
        """Stop the leader election process and release leadership."""

Election Configuration

Configuration object that defines leader election behavior and callbacks.

class ElectionConfig:
    def __init__(self, lock, lease_duration, renew_deadline, retry_period, 
                 onstarted_leading=None, onstopped_leading=None):
        """
        Leader election configuration.
        
        Parameters:
        - lock: Lock, resource lock implementation (LeaseLock or ConfigMapLock)
        - lease_duration: int, how long leadership lease lasts (seconds)
        - renew_deadline: int, deadline for renewing leadership (seconds)
        - retry_period: int, how often to attempt acquiring leadership (seconds)
        - onstarted_leading: callable, callback when becoming leader
        - onstopped_leading: callable, callback when losing leadership
        """
        
    @property
    def identity(self):
        """Unique identity of this election participant."""
        
    @property
    def name(self):
        """Name of the election (from lock resource name)."""

Leader Election Record

Metadata stored in the coordination resource to track leadership state.

class LeaderElectionRecord:
    def __init__(self, holder_identity, lease_duration_seconds, acquire_time, 
                 renew_time, leader_transitions):
        """
        Leadership record metadata.
        
        Parameters:
        - holder_identity: str, identity of current leader
        - lease_duration_seconds: int, lease duration in seconds
        - acquire_time: datetime, when leadership was acquired
        - renew_time: datetime, when lease was last renewed
        - leader_transitions: int, number of leadership changes
        """
        
    def to_dict(self):
        """Convert record to dictionary for storage."""
        
    @classmethod
    def from_dict(cls, data):
        """Create record from dictionary."""

Resource Lock Implementations

Different Kubernetes resources that can be used for coordination.

class LeaseLock:
    def __init__(self, name, namespace, identity):
        """
        Lease-based coordination lock (recommended).
        
        Uses Kubernetes Lease resources for coordination. Leases are
        lightweight and designed specifically for coordination.
        
        Parameters:
        - name: str, lease resource name
        - namespace: str, namespace containing the lease
        - identity: str, unique identity of this participant
        """
    
    async def get(self, client):
        """Get current lease record."""
        
    async def create(self, client, record):
        """Create new lease with leadership record."""
        
    async def update(self, client, record):
        """Update existing lease record."""
        
    def describe(self):
        """Human-readable description of this lock."""

class ConfigMapLock:
    def __init__(self, name, namespace, identity):
        """
        ConfigMap-based coordination lock (legacy).
        
        Uses ConfigMap annotations for coordination. Less efficient than
        Lease locks but compatible with older Kubernetes versions.
        
        Parameters:
        - name: str, ConfigMap resource name
        - namespace: str, namespace containing the ConfigMap
        - identity: str, unique identity of this participant
        """
    
    async def get(self, client):
        """Get current ConfigMap record."""
        
    async def create(self, client, record):
        """Create new ConfigMap with leadership record."""
        
    async def update(self, client, record):
        """Update existing ConfigMap record."""
        
    def describe(self):
        """Human-readable description of this lock."""

class Lock:
    """
    Base interface for resource lock implementations.
    
    Custom lock implementations should inherit from this class
    and implement the required methods.
    """
    
    async def get(self, client):
        """Get current lock record."""
        raise NotImplementedError
        
    async def create(self, client, record):
        """Create new lock with record."""
        raise NotImplementedError
        
    async def update(self, client, record):
        """Update existing lock record."""
        raise NotImplementedError
        
    def describe(self):
        """Describe this lock."""
        raise NotImplementedError

Usage Examples

Basic Leader Election

import asyncio
import socket
from kubernetes_asyncio import client, config
from kubernetes_asyncio.leaderelection import leaderelection, electionconfig
from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplock

async def basic_leader_election():
    await config.load_config()
    
    # Generate unique identity for this instance
    identity = f"{socket.gethostname()}-{asyncio.current_task().get_name()}"
    
    # Create lease lock for coordination
    lock = leaselock.LeaseLock(
        name="my-app-leader",
        namespace="default", 
        identity=identity
    )
    
    # Leadership callbacks
    async def on_started_leading():
        print(f"{identity}: I am now the leader!")
        # Start leader-only work here
        while True:
            print(f"{identity}: Doing leader work...")
            await asyncio.sleep(5)
    
    def on_stopped_leading():
        print(f"{identity}: I am no longer the leader")
        # Stop leader-only work here
    
    # Configure election
    election_config = electionconfig.ElectionConfig(
        lock=lock,
        lease_duration=30,      # Hold leadership for 30 seconds max
        renew_deadline=20,      # Must renew within 20 seconds
        retry_period=5,         # Check for leadership every 5 seconds
        onstarted_leading=on_started_leading,
        onstopped_leading=on_stopped_leading
    )
    
    # Start election
    election = leaderelection.LeaderElection(election_config)
    
    try:
        await election.run()
    except KeyboardInterrupt:
        print(f"{identity}: Shutting down...")
        election.stop()

# Run multiple instances to see election in action
asyncio.run(basic_leader_election())

High Availability Controller

async def ha_controller():
    await config.load_config()
    
    identity = f"controller-{socket.gethostname()}-{os.getpid()}"
    
    # Use ConfigMap lock for this example
    lock = leaderelection.ConfigMapLock(
        name="ha-controller-lock",
        namespace="kube-system",
        identity=identity
    )
    
    # Controller state
    controller_running = False
    controller_task = None
    
    async def start_controller():
        """Start the actual controller logic."""
        nonlocal controller_running, controller_task
        
        print(f"{identity}: Starting controller")
        controller_running = True
        
        # Example controller logic
        async def controller_loop():
            v1 = client.CoreV1Api()
            
            try:
                while controller_running:
                    # Example: Monitor and manage pods
                    pods = await v1.list_pod_for_all_namespaces(
                        label_selector="managed-by=ha-controller"
                    )
                    
                    print(f"{identity}: Managing {len(pods.items)} pods")
                    
                    # Perform controller logic here
                    for pod in pods.items:
                        if pod.status.phase == "Failed":
                            print(f"{identity}: Cleaning up failed pod {pod.metadata.name}")
                            # Implement cleanup logic
                    
                    await asyncio.sleep(10)
                    
            except asyncio.CancelledError:
                print(f"{identity}: Controller loop cancelled")
            finally:
                await v1.api_client.close()
        
        controller_task = asyncio.create_task(controller_loop())
    
    def stop_controller():
        """Stop the controller logic."""
        nonlocal controller_running, controller_task
        
        print(f"{identity}: Stopping controller")
        controller_running = False
        
        if controller_task and not controller_task.done():
            controller_task.cancel()
    
    # Election configuration with tighter timing for responsiveness
    election_config = leaderelection.ElectionConfig(
        lock=lock,
        lease_duration=15,      # Shorter lease for faster failover
        renew_deadline=10,      # Renew deadline
        retry_period=2,         # Check frequently
        onstarted_leading=start_controller,
        onstopped_leading=stop_controller
    )
    
    election = leaderelection.LeaderElection(election_config)
    
    try:
        print(f"{identity}: Starting leader election")
        await election.run()
    except KeyboardInterrupt:
        print(f"{identity}: Received shutdown signal")
    finally:
        election.stop()
        stop_controller()
        if controller_task:
            await controller_task

asyncio.run(ha_controller())

Multi-Component Leader Election

async def multi_component_system():
    """Example of multiple components with separate leader elections."""
    
    await config.load_config()
    identity = f"system-{socket.gethostname()}-{os.getpid()}"
    
    # Component A: Data processor
    async def data_processor():
        print(f"{identity}: Starting data processor")
        while True:
            print(f"{identity}: Processing data batch...")
            await asyncio.sleep(8)
    
    def stop_data_processor():
        print(f"{identity}: Stopping data processor")
    
    # Component B: Cleanup service  
    async def cleanup_service():
        print(f"{identity}: Starting cleanup service")
        while True:
            print(f"{identity}: Running cleanup tasks...")
            await asyncio.sleep(15)
    
    def stop_cleanup_service():
        print(f"{identity}: Stopping cleanup service")
    
    # Separate elections for each component
    data_processor_lock = leaderelection.LeaseLock(
        name="data-processor-leader",
        namespace="default",
        identity=identity
    )
    
    cleanup_service_lock = leaderelection.LeaseLock(
        name="cleanup-service-leader", 
        namespace="default",
        identity=identity
    )
    
    # Create elections
    data_processor_election = leaderelection.LeaderElection(
        leaderelection.ElectionConfig(
            lock=data_processor_lock,
            lease_duration=20,
            renew_deadline=15,
            retry_period=3,
            onstarted_leading=data_processor,
            onstopped_leading=stop_data_processor
        )
    )
    
    cleanup_service_election = leaderelection.LeaderElection(
        leaderelection.ElectionConfig(
            lock=cleanup_service_lock,
            lease_duration=25,
            renew_deadline=20,
            retry_period=4,
            onstarted_leading=cleanup_service,
            onstopped_leading=stop_cleanup_service
        )
    )
    
    # Run both elections concurrently
    try:
        await asyncio.gather(
            data_processor_election.run(),
            cleanup_service_election.run()
        )
    except KeyboardInterrupt:
        print(f"{identity}: Shutting down all components")
        data_processor_election.stop()
        cleanup_service_election.stop()

# asyncio.run(multi_component_system())  # Uncomment to run

Custom Lock Implementation

class CustomLock(leaderelection.Lock):
    """Example custom lock using a different Kubernetes resource."""
    
    def __init__(self, name, namespace, identity):
        self.name = name
        self.namespace = namespace
        self.identity = identity
    
    async def get(self, client):
        """Get current lock record from custom resource."""
        try:
            # Example: Using a Secret for coordination
            v1 = client.CoreV1Api()
            secret = await v1.read_namespaced_secret(
                name=self.name,
                namespace=self.namespace
            )
            
            # Parse leadership data from secret
            if 'leader-election' in secret.data:
                import json, base64
                data = base64.b64decode(secret.data['leader-election']).decode('utf-8')
                record_data = json.loads(data)
                return leaderelection.LeaderElectionRecord.from_dict(record_data)
                
        except client.ApiException as e:
            if e.status == 404:
                return None  # Lock doesn't exist yet
            raise
        
        return None
    
    async def create(self, client, record):
        """Create new secret with leadership record."""
        import json, base64
        
        v1 = client.CoreV1Api()
        
        secret_data = base64.b64encode(
            json.dumps(record.to_dict()).encode('utf-8')
        ).decode('utf-8')
        
        secret = client.V1Secret(
            metadata=client.V1ObjectMeta(
                name=self.name,
                namespace=self.namespace
            ),
            data={'leader-election': secret_data}
        )
        
        await v1.create_namespaced_secret(
            namespace=self.namespace,
            body=secret
        )
    
    async def update(self, client, record):
        """Update existing secret with new record."""
        import json, base64
        
        v1 = client.CoreV1Api()
        
        secret_data = base64.b64encode(
            json.dumps(record.to_dict()).encode('utf-8')
        ).decode('utf-8')
        
        # Patch the secret
        await v1.patch_namespaced_secret(
            name=self.name,
            namespace=self.namespace,
            body={'data': {'leader-election': secret_data}}
        )
    
    def describe(self):
        return f"CustomLock(Secret/{self.namespace}/{self.name})"

async def custom_lock_example():
    await config.load_config()
    
    identity = f"custom-{socket.gethostname()}"
    
    # Use custom lock implementation
    custom_lock = CustomLock(
        name="custom-leader-lock",
        namespace="default",
        identity=identity
    )
    
    async def leader_work():
        print(f"{identity}: Leading with custom lock!")
        while True:
            print(f"{identity}: Custom leader doing work...")
            await asyncio.sleep(6)
    
    def stop_leader_work():
        print(f"{identity}: Stopped leading with custom lock")
    
    election_config = leaderelection.ElectionConfig(
        lock=custom_lock,
        lease_duration=20,
        renew_deadline=15,
        retry_period=3,
        onstarted_leading=leader_work,
        onstopped_leading=stop_leader_work
    )
    
    election = leaderelection.LeaderElection(election_config)
    
    try:
        print(f"{identity}: Starting election with custom lock")
        await election.run()
    except KeyboardInterrupt:
        print(f"{identity}: Shutting down custom lock election")
        election.stop()

# asyncio.run(custom_lock_example())  # Uncomment to run

Graceful Shutdown with Leader Election

import signal

async def graceful_shutdown_example():
    await config.load_config()
    
    identity = f"graceful-{socket.gethostname()}"
    shutdown_event = asyncio.Event()
    
    # Handle shutdown signals
    def signal_handler(signum, frame):
        print(f"{identity}: Received signal {signum}, initiating graceful shutdown")
        shutdown_event.set()
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    lock = leaderelection.LeaseLock(
        name="graceful-leader",
        namespace="default",
        identity=identity
    )
    
    leader_task = None
    
    async def start_leader_work():
        nonlocal leader_task
        print(f"{identity}: Becoming leader")
        
        async def leader_loop():
            try:
                while not shutdown_event.is_set():
                    print(f"{identity}: Leader work iteration")
                    
                    # Simulate work that can be interrupted
                    try:
                        await asyncio.wait_for(asyncio.sleep(5), timeout=1)
                    except asyncio.TimeoutError:
                        pass  # Check shutdown event more frequently
                        
            except asyncio.CancelledError:
                print(f"{identity}: Leader work cancelled")
                raise
            finally:
                print(f"{identity}: Leader work cleanup completed")
        
        leader_task = asyncio.create_task(leader_loop())
    
    def stop_leader_work():
        nonlocal leader_task
        print(f"{identity}: Stopping leader work")
        
        if leader_task and not leader_task.done():
            leader_task.cancel()
    
    election_config = leaderelection.ElectionConfig(
        lock=lock,
        lease_duration=30,
        renew_deadline=20,
        retry_period=2,
        onstarted_leading=start_leader_work,
        onstopped_leading=stop_leader_work
    )
    
    election = leaderelection.LeaderElection(election_config)
    
    try:
        # Run election until shutdown signal
        election_task = asyncio.create_task(election.run())
        
        # Wait for either election completion or shutdown signal
        await asyncio.gather(
            election_task,
            shutdown_event.wait(),
            return_when=asyncio.FIRST_COMPLETED
        )
        
    finally:
        print(f"{identity}: Cleaning up...")
        election.stop()
        
        if leader_task and not leader_task.done():
            leader_task.cancel()
            try:
                await leader_task
            except asyncio.CancelledError:
                pass
        
        print(f"{identity}: Graceful shutdown completed")

asyncio.run(graceful_shutdown_example())

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes-asyncio

docs

client-apis.md

configuration.md

dynamic-client.md

index.md

leader-election.md

streaming.md

utils.md

watch.md

tile.json