Python client for Redis database and key-value store
—
Redis distributed locks provide mutual exclusion across multiple processes or servers using Redis as a coordination service. The Lock implementation uses SET with NX and EX options for atomic lock acquisition with automatic expiration.
Redis Lock class for creating and managing distributed locks with automatic expiration.
def lock(
self,
name: str,
timeout: Optional[float] = None,
sleep: float = 0.1,
blocking: bool = True,
blocking_timeout: Optional[float] = None,
thread_local: bool = True
) -> "Lock": ...
class Lock:
def __init__(
self,
redis: Redis,
name: str,
timeout: Optional[float] = None,
sleep: float = 0.1,
blocking: bool = True,
blocking_timeout: Optional[float] = None,
thread_local: bool = True
): ...
def acquire(
self,
blocking: Optional[bool] = None,
blocking_timeout: Optional[float] = None,
token: Optional[bytes] = None
) -> bool: ...
def release(self) -> None: ...
def extend(self, additional_time: float, replace_ttl: bool = False) -> bool: ...
def locked(self) -> bool: ...
def owned(self) -> bool: ...
def reacquire(self) -> bool: ...
def __enter__(self) -> "Lock": ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
@property
def token(self) -> Optional[bytes]: ...import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, db=0)
def worker(worker_id):
"""Worker function that acquires a lock before processing"""
lock = r.lock("shared_resource", timeout=10)
try:
# Try to acquire the lock
if lock.acquire(blocking=False):
print(f"Worker {worker_id}: Acquired lock")
# Simulate work with shared resource
print(f"Worker {worker_id}: Processing shared resource...")
time.sleep(2)
print(f"Worker {worker_id}: Work completed")
else:
print(f"Worker {worker_id}: Could not acquire lock")
finally:
# Always try to release the lock
if lock.owned():
lock.release()
print(f"Worker {worker_id}: Released lock")
# Run multiple workers
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def process_with_lock():
"""Use lock as context manager for automatic cleanup"""
with r.lock("critical_section", timeout=5) as lock:
print("Entered critical section")
# Simulate critical work
time.sleep(2)
print("Exiting critical section")
# Lock is automatically released here
# Call the function
process_with_lock()import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, db=0)
def blocking_worker(worker_id):
"""Worker that waits for lock with timeout"""
lock = r.lock("shared_counter", timeout=10)
try:
# Block for up to 5 seconds waiting for the lock
acquired = lock.acquire(blocking=True, blocking_timeout=5)
if acquired:
print(f"Worker {worker_id}: Got the lock after waiting")
# Increment counter
current = r.get("counter") or b"0"
new_value = int(current) + 1
r.set("counter", new_value)
print(f"Worker {worker_id}: Counter = {new_value}")
time.sleep(1) # Hold lock briefly
else:
print(f"Worker {worker_id}: Timeout waiting for lock")
finally:
if lock.owned():
lock.release()
# Initialize counter
r.set("counter", 0)
# Start multiple workers
threads = []
for i in range(5):
t = threading.Thread(target=blocking_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
final_count = r.get("counter")
print(f"Final counter value: {final_count}")import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, db=0)
def long_running_task():
"""Task that may need to extend lock duration"""
lock = r.lock("long_task", timeout=5) # Initial 5 second timeout
try:
if lock.acquire():
print("Started long running task")
for i in range(10):
# Simulate work
time.sleep(1)
print(f"Working... step {i+1}/10")
# Extend lock if we're halfway through and need more time
if i == 4: # After 5 seconds of work
extended = lock.extend(additional_time=5)
if extended:
print("Extended lock by 5 seconds")
else:
print("Failed to extend lock - it may have expired")
break
print("Task completed")
finally:
if lock.owned():
lock.release()
long_running_task()import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def task_with_reacquisition():
"""Task that releases and reacquires lock"""
lock = r.lock("reacquire_demo", timeout=10)
try:
# Initial acquisition
if lock.acquire():
print("Phase 1: Acquired lock")
time.sleep(2)
# Release lock temporarily
lock.release()
print("Released lock for other processes")
# Do some work that doesn't need the lock
time.sleep(1)
# Reacquire the same lock
if lock.reacquire():
print("Phase 2: Reacquired lock")
time.sleep(2)
print("Phase 2 complete")
else:
print("Failed to reacquire lock")
finally:
if lock.owned():
lock.release()
task_with_reacquisition()import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, db=0)
def monitor_lock():
"""Monitor lock status"""
lock = r.lock("monitored_lock", timeout=8)
def lock_holder():
"""Function that holds the lock"""
with lock:
print("Lock holder: Acquired lock")
time.sleep(5)
print("Lock holder: Releasing lock")
def lock_monitor():
"""Function that monitors the lock"""
time.sleep(0.5) # Let lock holder start first
for i in range(10):
is_locked = lock.locked()
is_owned = lock.owned()
print(f"Monitor: Locked={is_locked}, Owned by us={is_owned}")
time.sleep(1)
# Start both threads
holder_thread = threading.Thread(target=lock_holder)
monitor_thread = threading.Thread(target=lock_monitor)
holder_thread.start()
monitor_thread.start()
holder_thread.join()
monitor_thread.join()
monitor_lock()import redis
import time
import threading
import random
class DistributedCounter:
def __init__(self, redis_client, counter_name):
self.r = redis_client
self.counter_name = counter_name
self.lock_name = f"{counter_name}:lock"
def increment(self, amount=1):
"""Thread-safe increment operation"""
with self.r.lock(self.lock_name, timeout=5) as lock:
# Get current value
current = self.r.get(self.counter_name)
current_value = int(current) if current else 0
# Simulate some processing time
time.sleep(random.uniform(0.01, 0.1))
# Set new value
new_value = current_value + amount
self.r.set(self.counter_name, new_value)
return new_value
def get_value(self):
"""Get current counter value"""
value = self.r.get(self.counter_name)
return int(value) if value else 0
# Usage example
r = redis.Redis(host='localhost', port=6379, db=0)
counter = DistributedCounter(r, "global_counter")
# Initialize counter
r.set("global_counter", 0)
def worker(worker_id, counter, iterations):
"""Worker that increments counter multiple times"""
for i in range(iterations):
value = counter.increment()
print(f"Worker {worker_id}: Incremented to {value}")
time.sleep(random.uniform(0.1, 0.3))
# Run multiple workers concurrently
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, counter, 3))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Final counter value: {counter.get_value()}")import redis
import uuid
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def custom_token_lock():
"""Use custom token for lock identification"""
# Generate custom token
custom_token = str(uuid.uuid4()).encode()
lock = r.lock("custom_token_lock", timeout=10)
try:
# Acquire with custom token
acquired = lock.acquire(token=custom_token)
if acquired:
print(f"Acquired lock with token: {custom_token}")
print(f"Lock token: {lock.token}")
# Verify we own the lock
print(f"Lock owned: {lock.owned()}")
time.sleep(2)
finally:
if lock.owned():
lock.release()
print("Lock released")
custom_token_lock()import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, db=0)
def task_with_expiration_handling():
"""Handle case where lock expires during processing"""
lock = r.lock("expiring_lock", timeout=3) # Short timeout for demo
try:
if lock.acquire():
print("Acquired lock")
# Simulate long-running task
for i in range(8):
time.sleep(1)
# Check if we still own the lock
if not lock.owned():
print(f"Lock expired during processing at step {i+1}")
break
print(f"Processing step {i+1}")
else:
print("Task completed successfully")
except Exception as e:
print(f"Error during processing: {e}")
finally:
# Only release if we still own it
if lock.owned():
lock.release()
print("Released lock")
else:
print("Lock was already expired/released")
task_with_expiration_handling()import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
class MultiLock:
def __init__(self, redis_client, lock_names, timeout=10):
self.r = redis_client
self.locks = [r.lock(name, timeout=timeout) for name in lock_names]
self.acquired_locks = []
def acquire_all(self, blocking=True, blocking_timeout=None):
"""Acquire all locks or none"""
try:
for lock in self.locks:
if lock.acquire(blocking=blocking, blocking_timeout=blocking_timeout):
self.acquired_locks.append(lock)
else:
# Failed to acquire a lock, release all acquired ones
self.release_all()
return False
return True
except Exception:
self.release_all()
raise
def release_all(self):
"""Release all acquired locks"""
for lock in self.acquired_locks:
if lock.owned():
lock.release()
self.acquired_locks.clear()
def __enter__(self):
if self.acquire_all():
return self
else:
raise RuntimeError("Could not acquire all locks")
def __exit__(self, exc_type, exc_val, exc_tb):
self.release_all()
def transfer_between_accounts(from_account, to_account, amount):
"""Transfer money between accounts with multi-lock"""
# Lock both accounts to prevent race conditions
lock_names = [f"account:{from_account}:lock", f"account:{to_account}:lock"]
with MultiLock(r, lock_names) as multi_lock:
print(f"Acquired locks for accounts {from_account} and {to_account}")
# Get balances
from_balance = float(r.get(f"account:{from_account}:balance") or 0)
to_balance = float(r.get(f"account:{to_account}:balance") or 0)
# Check sufficient funds
if from_balance < amount:
raise ValueError("Insufficient funds")
# Perform transfer
time.sleep(0.1) # Simulate processing time
r.set(f"account:{from_account}:balance", from_balance - amount)
r.set(f"account:{to_account}:balance", to_balance + amount)
print(f"Transferred ${amount} from account {from_account} to {to_account}")
# Initialize accounts
r.set("account:1001:balance", 1000)
r.set("account:1002:balance", 500)
# Perform transfer
try:
transfer_between_accounts("1001", "1002", 250)
# Check final balances
balance_1001 = r.get("account:1001:balance")
balance_1002 = r.get("account:1002:balance")
print(f"Final balances: Account 1001: ${balance_1001}, Account 1002: ${balance_1002}")
except Exception as e:
print(f"Transfer failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-redis