Python implementation of redis API, can be used for testing purposes
—
Redis transaction support with MULTI/EXEC commands and optimistic locking via WATCH. Transactions provide atomic execution of command groups with conditional execution based on watched key modifications, enabling safe concurrent operations and complex multi-step procedures.
Core transaction commands for grouping operations and atomic execution.
def multi(self) -> None: ...
def exec_(self) -> List[Any]: ...
def discard(self) -> None: ...Watch mechanisms for conditional transaction execution based on key modifications.
def watch(self, *names: KeyT) -> bool: ...
def unwatch(self) -> bool: ...Pipelined command execution for performance optimization with optional transaction semantics.
class Pipeline:
def __init__(self, connection_pool, response_callbacks, transaction: bool, shard_hint: Optional[str]): ...
def __enter__(self) -> Pipeline: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
def multi(self) -> Pipeline: ...
def exec_(self) -> List[Any]: ...
def discard(self) -> None: ...
def watch(self, *names: KeyT) -> bool: ...
def unwatch(self) -> bool: ...
def reset(self) -> None: ...
def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> Pipeline: ...Utility functions for transaction management and error handling.
# Transaction execution with retry logic
def transaction(
func: Callable,
*watches: KeyT,
shard_hint: Optional[str] = None,
value_from_callable: bool = False,
watch_delay: Optional[float] = None
) -> Any: ...import fakeredis
client = fakeredis.FakeRedis()
# Setup initial data
client.set('counter', '10')
client.set('balance', '100')
# Basic transaction - all commands executed atomically
client.multi()
client.incr('counter')
client.decr('balance', 10)
client.set('last_operation', 'purchase')
# Execute all commands atomically
results = client.execute()
print(f"Transaction results: {results}") # [11, 90, True]
# Verify final state
print(f"Counter: {client.get('counter').decode()}") # '11'
print(f"Balance: {client.get('balance').decode()}") # '90'
print(f"Last op: {client.get('last_operation').decode()}") # 'purchase'import fakeredis
client = fakeredis.FakeRedis()
# Setup initial data
client.set('account_a', '1000')
client.set('account_b', '500')
# Start transaction
client.multi()
client.decrby('account_a', 100) # Deduct from account A
client.incrby('account_b', 100) # Add to account B
client.set('transfer_log', 'transfer_123')
# Simulate decision to cancel transaction
client.discard() # Cancel all queued commands
# Verify no changes were made
print(f"Account A: {client.get('account_a').decode()}") # Still '1000'
print(f"Account B: {client.get('account_b').decode()}") # Still '500'
print(f"Transfer log: {client.get('transfer_log')}") # Noneimport fakeredis
import threading
import time
client = fakeredis.FakeRedis()
# Setup shared counter
client.set('shared_counter', '0')
def increment_counter_safely(worker_id):
"""Safely increment counter using optimistic locking"""
max_retries = 5
for attempt in range(max_retries):
try:
# Watch the counter for changes
client.watch('shared_counter')
# Get current value
current_value = int(client.get('shared_counter').decode())
# Simulate some processing time
time.sleep(0.01)
# Start transaction
client.multi()
client.set('shared_counter', str(current_value + 1))
client.set(f'worker_{worker_id}_last_update', str(int(time.time())))
# Execute transaction
result = client.execute()
if result is not None: # Transaction succeeded
print(f"Worker {worker_id}: Successfully incremented to {current_value + 1}")
break
else: # Transaction was aborted due to watched key change
print(f"Worker {worker_id}: Retry {attempt + 1} - counter was modified by another worker")
except Exception as e:
print(f"Worker {worker_id}: Error - {e}")
finally:
# Always unwatch to clean up
client.unwatch()
else:
print(f"Worker {worker_id}: Failed to increment after {max_retries} attempts")
# Test concurrent increments
print("Starting concurrent counter increment test...")
# Create multiple worker threads
workers = []
for i in range(5):
worker = threading.Thread(target=increment_counter_safely, args=(i,))
workers.append(worker)
# Start all workers
for worker in workers:
worker.start()
# Wait for all workers to complete
for worker in workers:
worker.join()
# Check final counter value
final_value = client.get('shared_counter').decode()
print(f"Final counter value: {final_value}")
# Check which workers succeeded
for i in range(5):
last_update = client.get(f'worker_{i}_last_update')
if last_update:
print(f"Worker {i} last updated at: {last_update.decode()}")import fakeredis
import time
client = fakeredis.FakeRedis()
# Setup test data
for i in range(5):
client.set(f'item:{i}', f'value_{i}')
client.set(f'counter:{i}', str(i * 10))
# Using pipeline with transactions for better performance
with client.pipeline(transaction=True) as pipe:
# All commands are queued
pipe.multi()
# Batch operations
for i in range(5):
pipe.get(f'item:{i}')
pipe.incr(f'counter:{i}')
pipe.set(f'timestamp:{i}', str(int(time.time())))
# Execute all commands atomically
results = pipe.execute()
print(f"Pipeline transaction executed {len(results)} commands")
# Results are returned in order of execution
items = []
counters = []
timestamps = []
for i in range(0, len(results), 3): # Every 3 results (get, incr, set)
items.append(results[i].decode() if results[i] else None)
counters.append(results[i + 1])
timestamps.append(results[i + 2])
print("Items:", items)
print("Counters:", counters)
print("All operations completed atomically")import fakeredis
client = fakeredis.FakeRedis()
# Pipeline without transaction - better performance, no atomicity
with client.pipeline(transaction=False) as pipe:
# Queue multiple commands
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.get('key1')
pipe.get('key2')
pipe.incr('counter')
pipe.incr('counter')
# Execute all commands (not atomic, but faster)
results = pipe.execute()
print(f"Non-transactional pipeline results: {results}")
# [True, True, b'value1', b'value2', 1, 2]import fakeredis
import json
client = fakeredis.FakeRedis()
# Setup e-commerce inventory
products = {
'product_1': {'price': 29.99, 'stock': 10},
'product_2': {'price': 49.99, 'stock': 5},
'product_3': {'price': 19.99, 'stock': 0}
}
for product_id, data in products.items():
client.hset(f'product:{product_id}', mapping={
'price': str(data['price']),
'stock': str(data['stock'])
})
client.set('user:123:balance', '150.00')
def process_order(user_id, orders):
"""Process an order with inventory and balance checks"""
# Watch all relevant keys
watch_keys = [f'user:{user_id}:balance']
for product_id, quantity in orders.items():
watch_keys.append(f'product:{product_id}')
client.watch(*watch_keys)
try:
# Check current balance
current_balance = float(client.get(f'user:{user_id}:balance').decode())
# Calculate total cost and check availability
total_cost = 0
order_details = []
for product_id, quantity in orders.items():
product_data = client.hgetall(f'product:{product_id}')
if not product_data:
print(f"Product {product_id} not found")
return False
price = float(product_data[b'price'].decode())
stock = int(product_data[b'stock'].decode())
if stock < quantity:
print(f"Insufficient stock for {product_id}: need {quantity}, have {stock}")
return False
item_cost = price * quantity
total_cost += item_cost
order_details.append({
'product_id': product_id,
'quantity': quantity,
'price': price,
'total': item_cost,
'remaining_stock': stock - quantity
})
# Check if user has sufficient balance
if current_balance < total_cost:
print(f"Insufficient balance: need ${total_cost:.2f}, have ${current_balance:.2f}")
return False
# All checks passed, execute transaction
client.multi()
# Deduct from user balance
new_balance = current_balance - total_cost
client.set(f'user:{user_id}:balance', f'{new_balance:.2f}')
# Update inventory
for order in order_details:
client.hset(
f'product:{order["product_id"]}',
'stock',
str(order['remaining_stock'])
)
# Create order record
order_id = f'order_{int(time.time() * 1000)}'
client.set(f'order:{order_id}', json.dumps({
'user_id': user_id,
'items': order_details,
'total_cost': total_cost,
'timestamp': int(time.time())
}))
# Execute transaction
result = client.execute()
if result is not None:
print(f"✅ Order {order_id} processed successfully!")
print(f" Total: ${total_cost:.2f}")
print(f" New balance: ${new_balance:.2f}")
return True
else:
print("❌ Transaction aborted - data was modified during processing")
return False
except Exception as e:
print(f"Error processing order: {e}")
return False
finally:
client.unwatch()
# Test order processing
print("=== Order Processing Test ===")
# Successful order
print("\n1. Processing valid order...")
success1 = process_order('123', {
'product_1': 2, # 2 × $29.99 = $59.98
'product_2': 1 # 1 × $49.99 = $49.99
}) # Total: $109.97
print(f"Order 1 result: {'Success' if success1 else 'Failed'}")
# Check remaining balance and inventory
print(f"Remaining balance: ${client.get('user:123:balance').decode()}")
print(f"Product 1 stock: {client.hget('product:product_1', 'stock').decode()}")
print(f"Product 2 stock: {client.hget('product:product_2', 'stock').decode()}")
# Order that exceeds balance
print("\n2. Processing order that exceeds balance...")
success2 = process_order('123', {
'product_2': 3 # 3 × $49.99 = $149.97 (exceeds remaining balance)
})
# Order with insufficient stock
print("\n3. Processing order with insufficient stock...")
success3 = process_order('123', {
'product_3': 1 # Out of stock
})import fakeredis
import threading
import time
from typing import Dict, Any
class AtomicCounters:
def __init__(self, client: fakeredis.FakeRedis):
self.client = client
def increment(self, counter_name: str, amount: int = 1) -> int:
"""Atomically increment a counter"""
return self.client.incrby(counter_name, amount)
def decrement(self, counter_name: str, amount: int = 1) -> int:
"""Atomically decrement a counter"""
return self.client.decrby(counter_name, amount)
def increment_multiple(self, counters: Dict[str, int]) -> Dict[str, int]:
"""Atomically increment multiple counters"""
with self.client.pipeline(transaction=True) as pipe:
pipe.multi()
for counter_name, amount in counters.items():
pipe.incrby(counter_name, amount)
results = pipe.execute()
# Return new values
return dict(zip(counters.keys(), results))
def conditional_increment(self, counter_name: str, condition_key: str, expected_value: str, amount: int = 1) -> bool:
"""Increment counter only if condition key has expected value"""
self.client.watch(condition_key, counter_name)
try:
# Check condition
current_value = self.client.get(condition_key)
if current_value is None or current_value.decode() != expected_value:
return False
# Condition met, increment counter
self.client.multi()
self.client.incrby(counter_name, amount)
result = self.client.execute()
return result is not None
finally:
self.client.unwatch()
def get_counters(self, *counter_names: str) -> Dict[str, int]:
"""Get current values of multiple counters"""
if not counter_names:
return {}
values = self.client.mget(counter_names)
return {
name: int(value.decode()) if value else 0
for name, value in zip(counter_names, values)
}
def reset_counter(self, counter_name: str) -> bool:
"""Reset counter to 0"""
return self.client.set(counter_name, '0')
# Usage example
client = fakeredis.FakeRedis()
counters = AtomicCounters(client)
# Initialize counters
counters.reset_counter('page_views')
counters.reset_counter('user_signups')
counters.reset_counter('orders_processed')
def simulate_web_traffic(worker_id: int, duration: int):
"""Simulate web traffic that updates various counters"""
start_time = time.time()
while time.time() - start_time < duration:
# Simulate page views
counters.increment('page_views')
# Occasionally simulate user signup
if time.time() % 3 < 0.1: # Roughly every 3 seconds
counters.increment('user_signups')
# Simulate order processing
if time.time() % 5 < 0.1: # Roughly every 5 seconds
counters.increment('orders_processed')
time.sleep(0.1) # 100ms between actions
print(f"Worker {worker_id} finished")
print("=== Atomic Counter Test ===")
# Start multiple workers to simulate concurrent traffic
workers = []
for i in range(3):
worker = threading.Thread(target=simulate_web_traffic, args=(i, 2)) # 2 seconds each
workers.append(worker)
worker.start()
# Wait for workers to complete
for worker in workers:
worker.join()
# Get final counter values
final_counts = counters.get_counters('page_views', 'user_signups', 'orders_processed')
print(f"Final counts: {final_counts}")
# Test bulk increment
print("\n=== Bulk Counter Update ===")
bulk_updates = {
'daily_logins': 100,
'api_calls': 500,
'database_queries': 250
}
results = counters.increment_multiple(bulk_updates)
print(f"Bulk update results: {results}")
# Test conditional increment
print("\n=== Conditional Counter Test ===")
client.set('feature_flag', 'enabled')
# This should succeed
success1 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)
print(f"Conditional increment (enabled): {success1}")
# Change flag and try again
client.set('feature_flag', 'disabled')
success2 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)
print(f"Conditional increment (disabled): {success2}")
final_feature_usage = counters.get_counters('feature_usage')
print(f"Feature usage count: {final_feature_usage}")import fakeredis
import time
import threading
import uuid
from typing import Optional
class DistributedLock:
def __init__(self, client: fakeredis.FakeRedis, key: str, timeout: int = 10):
self.client = client
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self.acquired = False
def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool:
"""Acquire the distributed lock"""
end_time = time.time() + (timeout or self.timeout)
while True:
# Try to acquire lock using SET with NX and EX
acquired = self.client.set(
self.key,
self.identifier,
nx=True, # Only set if key doesn't exist
ex=self.timeout # Set expiration
)
if acquired:
self.acquired = True
return True
if not blocking or time.time() >= end_time:
return False
time.sleep(0.01) # Brief pause before retry
def release(self) -> bool:
"""Release the distributed lock"""
if not self.acquired:
return False
# Use Lua script to atomically check and delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.client.eval(lua_script, 1, self.key, self.identifier)
if result:
self.acquired = False
return True
return False
def extend(self, additional_time: int) -> bool:
"""Extend lock timeout"""
if not self.acquired:
return False
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.client.eval(lua_script, 1, self.key, self.identifier, additional_time)
return bool(result)
def __enter__(self):
if not self.acquire():
raise Exception(f"Could not acquire lock: {self.key}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def critical_section_work(worker_id: int, shared_resource: str, client: fakeredis.FakeRedis):
"""Simulate work that requires exclusive access to a shared resource"""
# Try to acquire lock for the shared resource
lock = DistributedLock(client, shared_resource, timeout=5)
try:
print(f"Worker {worker_id}: Attempting to acquire lock for {shared_resource}")
if lock.acquire(blocking=True, timeout=3.0):
print(f"Worker {worker_id}: ✅ Acquired lock for {shared_resource}")
# Get current value
current_value = client.get(f"resource:{shared_resource}")
current_count = int(current_value.decode()) if current_value else 0
# Simulate some processing time
time.sleep(0.5)
# Update the resource
new_count = current_count + 1
client.set(f"resource:{shared_resource}", str(new_count))
print(f"Worker {worker_id}: Updated {shared_resource} from {current_count} to {new_count}")
else:
print(f"Worker {worker_id}: ❌ Could not acquire lock for {shared_resource}")
finally:
if lock.acquired:
lock.release()
print(f"Worker {worker_id}: Released lock for {shared_resource}")
# Test distributed locking
client = fakeredis.FakeRedis()
# Initialize shared resource
client.set("resource:shared_counter", "0")
print("=== Distributed Lock Test ===")
# Create workers that compete for the same resource
workers = []
for i in range(5):
worker = threading.Thread(
target=critical_section_work,
args=(i, "shared_counter", client)
)
workers.append(worker)
# Start all workers simultaneously
for worker in workers:
worker.start()
# Wait for all workers
for worker in workers:
worker.join()
# Check final value
final_value = client.get("resource:shared_counter").decode()
print(f"\nFinal shared counter value: {final_value}")
# Test lock context manager
print("\n=== Lock Context Manager Test ===")
def update_with_context_manager(resource_name: str):
try:
with DistributedLock(client, resource_name) as lock:
print(f"Inside critical section for {resource_name}")
# Get and increment counter
current = client.get(f"resource:{resource_name}")
count = int(current.decode()) if current else 0
client.set(f"resource:{resource_name}", str(count + 10))
print(f"Updated {resource_name} to {count + 10}")
except Exception as e:
print(f"Failed to acquire lock: {e}")
# Initialize new resource
client.set("resource:context_test", "0")
# Test context manager
update_with_context_manager("context_test")
final_context_value = client.get("resource:context_test").decode()
print(f"Final context test value: {final_context_value}")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs