CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

distributed-locking.mddocs/

Distributed Locking

Redis distributed locks provide mutual exclusion across multiple processes or servers using Redis as a coordination service. The Lock implementation uses SET with NX and EX options for atomic lock acquisition with automatic expiration.

Capabilities

Lock Operations

Redis Lock class for creating and managing distributed locks with automatic expiration.

def lock(
    self,
    name: str,
    timeout: Optional[float] = None,
    sleep: float = 0.1,
    blocking: bool = True,
    blocking_timeout: Optional[float] = None,
    thread_local: bool = True
) -> "Lock": ...

class Lock:
    def __init__(
        self,
        redis: Redis,
        name: str,
        timeout: Optional[float] = None,
        sleep: float = 0.1,
        blocking: bool = True,
        blocking_timeout: Optional[float] = None,
        thread_local: bool = True
    ): ...
    
    def acquire(
        self,
        blocking: Optional[bool] = None,
        blocking_timeout: Optional[float] = None,
        token: Optional[bytes] = None
    ) -> bool: ...
    
    def release(self) -> None: ...
    
    def extend(self, additional_time: float, replace_ttl: bool = False) -> bool: ...
    
    def locked(self) -> bool: ...
    
    def owned(self) -> bool: ...
    
    def reacquire(self) -> bool: ...
    
    def __enter__(self) -> "Lock": ...
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
    
    @property
    def token(self) -> Optional[bytes]: ...

Usage Examples

Basic Lock Usage

import redis
import time
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def worker(worker_id):
    """Worker function that acquires a lock before processing"""
    lock = r.lock("shared_resource", timeout=10)
    
    try:
        # Try to acquire the lock
        if lock.acquire(blocking=False):
            print(f"Worker {worker_id}: Acquired lock")
            
            # Simulate work with shared resource
            print(f"Worker {worker_id}: Processing shared resource...")
            time.sleep(2)
            
            print(f"Worker {worker_id}: Work completed")
        else:
            print(f"Worker {worker_id}: Could not acquire lock")
    finally:
        # Always try to release the lock
        if lock.owned():
            lock.release()
            print(f"Worker {worker_id}: Released lock")

# Run multiple workers
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Lock with Context Manager

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def process_with_lock():
    """Use lock as context manager for automatic cleanup"""
    with r.lock("critical_section", timeout=5) as lock:
        print("Entered critical section")
        
        # Simulate critical work
        time.sleep(2)
        
        print("Exiting critical section")
    # Lock is automatically released here

# Call the function
process_with_lock()

Blocking Lock with Timeout

import redis
import time
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def blocking_worker(worker_id):
    """Worker that waits for lock with timeout"""
    lock = r.lock("shared_counter", timeout=10)
    
    try:
        # Block for up to 5 seconds waiting for the lock
        acquired = lock.acquire(blocking=True, blocking_timeout=5)
        
        if acquired:
            print(f"Worker {worker_id}: Got the lock after waiting")
            
            # Increment counter
            current = r.get("counter") or b"0"
            new_value = int(current) + 1
            r.set("counter", new_value)
            
            print(f"Worker {worker_id}: Counter = {new_value}")
            time.sleep(1)  # Hold lock briefly
            
        else:
            print(f"Worker {worker_id}: Timeout waiting for lock")
            
    finally:
        if lock.owned():
            lock.release()

# Initialize counter
r.set("counter", 0)

# Start multiple workers
threads = []
for i in range(5):
    t = threading.Thread(target=blocking_worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

final_count = r.get("counter")
print(f"Final counter value: {final_count}")

Lock Extension

import redis
import time
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def long_running_task():
    """Task that may need to extend lock duration"""
    lock = r.lock("long_task", timeout=5)  # Initial 5 second timeout
    
    try:
        if lock.acquire():
            print("Started long running task")
            
            for i in range(10):
                # Simulate work
                time.sleep(1)
                print(f"Working... step {i+1}/10")
                
                # Extend lock if we're halfway through and need more time
                if i == 4:  # After 5 seconds of work
                    extended = lock.extend(additional_time=5)
                    if extended:
                        print("Extended lock by 5 seconds")
                    else:
                        print("Failed to extend lock - it may have expired")
                        break
            
            print("Task completed")
    finally:
        if lock.owned():
            lock.release()

long_running_task()

Lock Reacquisition

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def task_with_reacquisition():
    """Task that releases and reacquires lock"""
    lock = r.lock("reacquire_demo", timeout=10)
    
    try:
        # Initial acquisition
        if lock.acquire():
            print("Phase 1: Acquired lock")
            time.sleep(2)
            
            # Release lock temporarily
            lock.release()
            print("Released lock for other processes")
            
            # Do some work that doesn't need the lock
            time.sleep(1)
            
            # Reacquire the same lock
            if lock.reacquire():
                print("Phase 2: Reacquired lock")
                time.sleep(2)
                print("Phase 2 complete")
            else:
                print("Failed to reacquire lock")
                
    finally:
        if lock.owned():
            lock.release()

task_with_reacquisition()

Lock Status Checking

import redis
import time
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def monitor_lock():
    """Monitor lock status"""
    lock = r.lock("monitored_lock", timeout=8)
    
    def lock_holder():
        """Function that holds the lock"""
        with lock:
            print("Lock holder: Acquired lock")
            time.sleep(5)
            print("Lock holder: Releasing lock")
    
    def lock_monitor():
        """Function that monitors the lock"""
        time.sleep(0.5)  # Let lock holder start first
        
        for i in range(10):
            is_locked = lock.locked()
            is_owned = lock.owned()
            
            print(f"Monitor: Locked={is_locked}, Owned by us={is_owned}")
            time.sleep(1)
    
    # Start both threads
    holder_thread = threading.Thread(target=lock_holder)
    monitor_thread = threading.Thread(target=lock_monitor)
    
    holder_thread.start()
    monitor_thread.start()
    
    holder_thread.join()
    monitor_thread.join()

monitor_lock()

Distributed Counter with Lock

import redis
import time
import threading
import random

class DistributedCounter:
    def __init__(self, redis_client, counter_name):
        self.r = redis_client
        self.counter_name = counter_name
        self.lock_name = f"{counter_name}:lock"
    
    def increment(self, amount=1):
        """Thread-safe increment operation"""
        with self.r.lock(self.lock_name, timeout=5) as lock:
            # Get current value
            current = self.r.get(self.counter_name)
            current_value = int(current) if current else 0
            
            # Simulate some processing time
            time.sleep(random.uniform(0.01, 0.1))
            
            # Set new value
            new_value = current_value + amount
            self.r.set(self.counter_name, new_value)
            
            return new_value
    
    def get_value(self):
        """Get current counter value"""
        value = self.r.get(self.counter_name)
        return int(value) if value else 0

# Usage example
r = redis.Redis(host='localhost', port=6379, db=0)
counter = DistributedCounter(r, "global_counter")

# Initialize counter
r.set("global_counter", 0)

def worker(worker_id, counter, iterations):
    """Worker that increments counter multiple times"""
    for i in range(iterations):
        value = counter.increment()
        print(f"Worker {worker_id}: Incremented to {value}")
        time.sleep(random.uniform(0.1, 0.3))

# Run multiple workers concurrently
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i, counter, 3))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Final counter value: {counter.get_value()}")

Lock with Custom Token

import redis
import uuid
import time

r = redis.Redis(host='localhost', port=6379, db=0)

def custom_token_lock():
    """Use custom token for lock identification"""
    # Generate custom token
    custom_token = str(uuid.uuid4()).encode()
    
    lock = r.lock("custom_token_lock", timeout=10)
    
    try:
        # Acquire with custom token
        acquired = lock.acquire(token=custom_token)
        
        if acquired:
            print(f"Acquired lock with token: {custom_token}")
            print(f"Lock token: {lock.token}")
            
            # Verify we own the lock
            print(f"Lock owned: {lock.owned()}")
            
            time.sleep(2)
            
    finally:
        if lock.owned():
            lock.release()
            print("Lock released")

custom_token_lock()

Handling Lock Expiration

import redis
import time
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def task_with_expiration_handling():
    """Handle case where lock expires during processing"""
    lock = r.lock("expiring_lock", timeout=3)  # Short timeout for demo
    
    try:
        if lock.acquire():
            print("Acquired lock")
            
            # Simulate long-running task
            for i in range(8):
                time.sleep(1)
                
                # Check if we still own the lock
                if not lock.owned():
                    print(f"Lock expired during processing at step {i+1}")
                    break
                    
                print(f"Processing step {i+1}")
            else:
                print("Task completed successfully")
                
    except Exception as e:
        print(f"Error during processing: {e}")
    finally:
        # Only release if we still own it
        if lock.owned():
            lock.release()
            print("Released lock")
        else:
            print("Lock was already expired/released")

task_with_expiration_handling()

Multi-Resource Locking

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

class MultiLock:
    def __init__(self, redis_client, lock_names, timeout=10):
        self.r = redis_client
        self.locks = [r.lock(name, timeout=timeout) for name in lock_names]
        self.acquired_locks = []
    
    def acquire_all(self, blocking=True, blocking_timeout=None):
        """Acquire all locks or none"""
        try:
            for lock in self.locks:
                if lock.acquire(blocking=blocking, blocking_timeout=blocking_timeout):
                    self.acquired_locks.append(lock)
                else:
                    # Failed to acquire a lock, release all acquired ones
                    self.release_all()
                    return False
            return True
        except Exception:
            self.release_all()
            raise
    
    def release_all(self):
        """Release all acquired locks"""
        for lock in self.acquired_locks:
            if lock.owned():
                lock.release()
        self.acquired_locks.clear()
    
    def __enter__(self):
        if self.acquire_all():
            return self
        else:
            raise RuntimeError("Could not acquire all locks")
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release_all()

def transfer_between_accounts(from_account, to_account, amount):
    """Transfer money between accounts with multi-lock"""
    # Lock both accounts to prevent race conditions
    lock_names = [f"account:{from_account}:lock", f"account:{to_account}:lock"]
    
    with MultiLock(r, lock_names) as multi_lock:
        print(f"Acquired locks for accounts {from_account} and {to_account}")
        
        # Get balances
        from_balance = float(r.get(f"account:{from_account}:balance") or 0)
        to_balance = float(r.get(f"account:{to_account}:balance") or 0)
        
        # Check sufficient funds
        if from_balance < amount:
            raise ValueError("Insufficient funds")
        
        # Perform transfer
        time.sleep(0.1)  # Simulate processing time
        
        r.set(f"account:{from_account}:balance", from_balance - amount)
        r.set(f"account:{to_account}:balance", to_balance + amount)
        
        print(f"Transferred ${amount} from account {from_account} to {to_account}")

# Initialize accounts
r.set("account:1001:balance", 1000)
r.set("account:1002:balance", 500)

# Perform transfer
try:
    transfer_between_accounts("1001", "1002", 250)
    
    # Check final balances
    balance_1001 = r.get("account:1001:balance")
    balance_1002 = r.get("account:1002:balance")
    print(f"Final balances: Account 1001: ${balance_1001}, Account 1002: ${balance_1002}")
    
except Exception as e:
    print(f"Transfer failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json