CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes

Python client library for interacting with Kubernetes clusters through the Kubernetes API

Pending
Overview
Eval results
Files

leader-election.mddocs/

Leader Election

Distributed leader election implementation using Kubernetes resources as coordination locks. Enables building highly available distributed applications where only one instance should be active (leader) while others remain on standby (followers).

Capabilities

Leader Election Process

Core leader election functionality that manages the election and lease renewal process for distributed applications.

class LeaderElection:
    def __init__(self, election_config: Config):
        """
        Initialize leader election with configuration.
        
        Parameters:
        - election_config: Election configuration including lock, timings, and callbacks
        """
    
    def run(self) -> None:
        """
        Start the leader election process.
        Blocks until leadership is lost or process terminates.
        """
    
    def acquire(self) -> bool:
        """
        Attempt to acquire leadership.
        
        Returns:
        - bool: True if leadership acquired, False otherwise
        """
    
    def renew_loop(self) -> None:
        """
        Continuously renew leadership lease while leader.
        Called automatically after acquiring leadership.
        """

Election Configuration

Configuration for leader election including timing parameters and callback functions.

class Config:
    def __init__(
        self,
        lock,
        lease_duration: int,
        renew_deadline: int,
        retry_period: int,
        onstarted_leading: callable,
        onstopped_leading: callable = None
    ):
        """
        Configure leader election parameters.
        
        Parameters:
        - lock: Resource lock implementation (ConfigMapLock, etc.)
        - lease_duration: Duration of leadership lease in seconds
        - renew_deadline: Deadline for lease renewal in seconds
        - retry_period: Retry interval in seconds
        - onstarted_leading: Callback when becoming leader
        - onstopped_leading: Callback when losing leadership
        
        Constraints:
        - lease_duration > renew_deadline
        - renew_deadline > jitter_factor * retry_period
        - All timing values must be >= 1
        """

Election Record

Represents the current state of a leadership election stored in the coordination resource.

class LeaderElectionRecord:
    def __init__(
        self,
        holder_identity: str,
        lease_duration: int,
        acquire_time: str,
        renew_time: str
    ):
        """
        Leader election record data.
        
        Parameters:
        - holder_identity: Unique identifier of current leader
        - lease_duration: Duration of leadership lease
        - acquire_time: When leadership was acquired (ISO format)
        - renew_time: When lease was last renewed (ISO format)
        """

Resource Locks

Kubernetes resource-based locks for coordination between election candidates.

class ConfigMapLock:
    def __init__(self, name: str, namespace: str, identity: str):
        """
        ConfigMap-based resource lock for leader election.
        
        Parameters:
        - name: Name of the ConfigMap to use as lock
        - namespace: Namespace containing the ConfigMap
        - identity: Unique identifier for this candidate
        """
    
    def get(self, name: str, namespace: str) -> tuple:
        """
        Get current election record from ConfigMap.
        
        Returns:
        - tuple: (success: bool, record: LeaderElectionRecord or exception)
        """
    
    def create(self, name: str, namespace: str, election_record: LeaderElectionRecord) -> bool:
        """
        Create new election record in ConfigMap.
        
        Returns:
        - bool: True if created successfully
        """
    
    def update(self, election_record: LeaderElectionRecord) -> bool:
        """
        Update existing election record.
        
        Returns:
        - bool: True if updated successfully
        """

Usage Examples

Basic Leader Election

from kubernetes import client, config
from kubernetes.leaderelection import LeaderElection
from kubernetes.leaderelection.electionconfig import Config
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock

# Load Kubernetes configuration
config.load_kube_config()

def on_started_leading():
    """Called when this instance becomes the leader."""
    print("Started leading - performing leader tasks")
    # Implement leader-specific logic here
    while True:
        # Leader work loop
        time.sleep(30)

def on_stopped_leading():
    """Called when this instance loses leadership."""
    print("Stopped leading - cleaning up")
    # Cleanup leader-specific resources

# Create resource lock using ConfigMap
lock = ConfigMapLock(
    name="my-app-lock",
    namespace="default", 
    identity="instance-1"
)

# Configure election parameters
election_config = Config(
    lock=lock,
    lease_duration=30,        # Leadership lease duration (seconds)
    renew_deadline=20,        # Lease renewal deadline (seconds)
    retry_period=5,           # Retry interval (seconds)
    onstarted_leading=on_started_leading,
    onstopped_leading=on_stopped_leading
)

# Start leader election
election = LeaderElection(election_config)
election.run()  # Blocks until process terminates

High Availability Service

import time
import threading
from kubernetes.leaderelection import LeaderElection
from kubernetes.leaderelection.electionconfig import Config
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock

class HighAvailabilityService:
    def __init__(self, service_name, namespace, instance_id):
        self.service_name = service_name
        self.running = False
        
        # Create resource lock
        self.lock = ConfigMapLock(
            name=f"{service_name}-leader-election",
            namespace=namespace,
            identity=instance_id
        )
        
        # Configure election
        self.config = Config(
            lock=self.lock,
            lease_duration=60,
            renew_deadline=40, 
            retry_period=10,
            onstarted_leading=self.become_leader,
            onstopped_leading=self.stop_leading
        )
    
    def become_leader(self):
        """Start service operations as leader."""
        print(f"{self.service_name}: Became leader, starting operations")
        self.running = True
        
        # Start background service operations
        while self.running:
            self.perform_leader_tasks()
            time.sleep(10)
    
    def stop_leading(self):
        """Stop service operations when losing leadership."""
        print(f"{self.service_name}: Lost leadership, stopping operations")
        self.running = False
    
    def perform_leader_tasks(self):
        """Core service logic that should only run on leader."""
        print("Performing leader-only tasks...")
        # Implement your service logic here
    
    def start(self):
        """Start participating in leader election."""
        election = LeaderElection(self.config)
        election.run()

# Usage
service = HighAvailabilityService(
    service_name="data-processor",
    namespace="production",
    instance_id="pod-12345"
)
service.start()

Import Statements

from kubernetes import leaderelection
from kubernetes.leaderelection import LeaderElection
from kubernetes.leaderelection.electionconfig import Config
from kubernetes.leaderelection.leaderelectionrecord import LeaderElectionRecord
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes

docs

application-workloads.md

autoscaling.md

configuration.md

core-resources.md

custom-resources.md

dynamic-client.md

index.md

leader-election.md

networking.md

rbac-security.md

resource-watching.md

storage.md

streaming-operations.md

utilities.md

tile.json