CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

transaction-operations.mddocs/

Transaction Operations

Redis transaction support with MULTI/EXEC commands and optimistic locking via WATCH. Transactions provide atomic execution of command groups with conditional execution based on watched key modifications, enabling safe concurrent operations and complex multi-step procedures.

Capabilities

Transaction Control

Core transaction commands for grouping operations and atomic execution.

def multi(self) -> None: ...

def exec_(self) -> List[Any]: ...

def discard(self) -> None: ...

Optimistic Locking

Watch mechanisms for conditional transaction execution based on key modifications.

def watch(self, *names: KeyT) -> bool: ...

def unwatch(self) -> bool: ...

Pipeline Operations

Pipelined command execution for performance optimization with optional transaction semantics.

class Pipeline:
    def __init__(self, connection_pool, response_callbacks, transaction: bool, shard_hint: Optional[str]): ...
    
    def __enter__(self) -> Pipeline: ...
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
    
    def execute(self, raise_on_error: bool = True) -> List[Any]: ...
    
    def multi(self) -> Pipeline: ...
    
    def exec_(self) -> List[Any]: ...
    
    def discard(self) -> None: ...
    
    def watch(self, *names: KeyT) -> bool: ...
    
    def unwatch(self) -> bool: ...
    
    def reset(self) -> None: ...

def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> Pipeline: ...

Transaction Helpers

Utility functions for transaction management and error handling.

# Transaction execution with retry logic
def transaction(
    func: Callable,
    *watches: KeyT,
    shard_hint: Optional[str] = None,
    value_from_callable: bool = False,
    watch_delay: Optional[float] = None
) -> Any: ...

Usage Examples

Basic Transactions

import fakeredis

client = fakeredis.FakeRedis()

# Setup initial data
client.set('counter', '10')
client.set('balance', '100')

# Basic transaction - all commands executed atomically
client.multi()
client.incr('counter')
client.decr('balance', 10)
client.set('last_operation', 'purchase')

# Execute all commands atomically
results = client.execute()
print(f"Transaction results: {results}")  # [11, 90, True]

# Verify final state
print(f"Counter: {client.get('counter').decode()}")    # '11'
print(f"Balance: {client.get('balance').decode()}")    # '90'
print(f"Last op: {client.get('last_operation').decode()}")  # 'purchase'

Transaction Rollback

import fakeredis

client = fakeredis.FakeRedis()

# Setup initial data
client.set('account_a', '1000')
client.set('account_b', '500')

# Start transaction
client.multi()
client.decrby('account_a', 100)  # Deduct from account A
client.incrby('account_b', 100)  # Add to account B
client.set('transfer_log', 'transfer_123')

# Simulate decision to cancel transaction
client.discard()  # Cancel all queued commands

# Verify no changes were made
print(f"Account A: {client.get('account_a').decode()}")  # Still '1000'
print(f"Account B: {client.get('account_b').decode()}")  # Still '500'
print(f"Transfer log: {client.get('transfer_log')}")     # None

Optimistic Locking with WATCH

import fakeredis
import threading
import time

client = fakeredis.FakeRedis()

# Setup shared counter
client.set('shared_counter', '0')

def increment_counter_safely(worker_id):
    """Safely increment counter using optimistic locking"""
    max_retries = 5
    
    for attempt in range(max_retries):
        try:
            # Watch the counter for changes
            client.watch('shared_counter')
            
            # Get current value
            current_value = int(client.get('shared_counter').decode())
            
            # Simulate some processing time
            time.sleep(0.01)
            
            # Start transaction
            client.multi()
            client.set('shared_counter', str(current_value + 1))
            client.set(f'worker_{worker_id}_last_update', str(int(time.time())))
            
            # Execute transaction
            result = client.execute()
            
            if result is not None:  # Transaction succeeded
                print(f"Worker {worker_id}: Successfully incremented to {current_value + 1}")
                break
            else:  # Transaction was aborted due to watched key change
                print(f"Worker {worker_id}: Retry {attempt + 1} - counter was modified by another worker")
                
        except Exception as e:
            print(f"Worker {worker_id}: Error - {e}")
            
        finally:
            # Always unwatch to clean up
            client.unwatch()
    
    else:
        print(f"Worker {worker_id}: Failed to increment after {max_retries} attempts")

# Test concurrent increments
print("Starting concurrent counter increment test...")

# Create multiple worker threads
workers = []
for i in range(5):
    worker = threading.Thread(target=increment_counter_safely, args=(i,))
    workers.append(worker)

# Start all workers
for worker in workers:
    worker.start()

# Wait for all workers to complete
for worker in workers:
    worker.join()

# Check final counter value
final_value = client.get('shared_counter').decode()
print(f"Final counter value: {final_value}")

# Check which workers succeeded
for i in range(5):
    last_update = client.get(f'worker_{i}_last_update')
    if last_update:
        print(f"Worker {i} last updated at: {last_update.decode()}")

Pipeline Transactions

import fakeredis
import time

client = fakeredis.FakeRedis()

# Setup test data
for i in range(5):
    client.set(f'item:{i}', f'value_{i}')
    client.set(f'counter:{i}', str(i * 10))

# Using pipeline with transactions for better performance
with client.pipeline(transaction=True) as pipe:
    # All commands are queued
    pipe.multi()
    
    # Batch operations
    for i in range(5):
        pipe.get(f'item:{i}')
        pipe.incr(f'counter:{i}')
        pipe.set(f'timestamp:{i}', str(int(time.time())))
    
    # Execute all commands atomically
    results = pipe.execute()

print(f"Pipeline transaction executed {len(results)} commands")

# Results are returned in order of execution
items = []
counters = []
timestamps = []

for i in range(0, len(results), 3):  # Every 3 results (get, incr, set)
    items.append(results[i].decode() if results[i] else None)
    counters.append(results[i + 1])
    timestamps.append(results[i + 2])

print("Items:", items)
print("Counters:", counters)
print("All operations completed atomically")

Pipeline without Transactions

import fakeredis

client = fakeredis.FakeRedis()

# Pipeline without transaction - better performance, no atomicity
with client.pipeline(transaction=False) as pipe:
    # Queue multiple commands
    pipe.set('key1', 'value1')
    pipe.set('key2', 'value2')
    pipe.get('key1')
    pipe.get('key2')
    pipe.incr('counter')
    pipe.incr('counter')
    
    # Execute all commands (not atomic, but faster)
    results = pipe.execute()

print(f"Non-transactional pipeline results: {results}")
# [True, True, b'value1', b'value2', 1, 2]

Complex Transaction with Conditional Logic

import fakeredis
import json

client = fakeredis.FakeRedis()

# Setup e-commerce inventory
products = {
    'product_1': {'price': 29.99, 'stock': 10},
    'product_2': {'price': 49.99, 'stock': 5},
    'product_3': {'price': 19.99, 'stock': 0}
}

for product_id, data in products.items():
    client.hset(f'product:{product_id}', mapping={
        'price': str(data['price']),
        'stock': str(data['stock'])
    })

client.set('user:123:balance', '150.00')

def process_order(user_id, orders):
    """Process an order with inventory and balance checks"""
    
    # Watch all relevant keys
    watch_keys = [f'user:{user_id}:balance']
    for product_id, quantity in orders.items():
        watch_keys.append(f'product:{product_id}')
    
    client.watch(*watch_keys)
    
    try:
        # Check current balance
        current_balance = float(client.get(f'user:{user_id}:balance').decode())
        
        # Calculate total cost and check availability
        total_cost = 0
        order_details = []
        
        for product_id, quantity in orders.items():
            product_data = client.hgetall(f'product:{product_id}')
            
            if not product_data:
                print(f"Product {product_id} not found")
                return False
            
            price = float(product_data[b'price'].decode())
            stock = int(product_data[b'stock'].decode())
            
            if stock < quantity:
                print(f"Insufficient stock for {product_id}: need {quantity}, have {stock}")
                return False
            
            item_cost = price * quantity
            total_cost += item_cost
            
            order_details.append({
                'product_id': product_id,
                'quantity': quantity,
                'price': price,
                'total': item_cost,
                'remaining_stock': stock - quantity
            })
        
        # Check if user has sufficient balance
        if current_balance < total_cost:
            print(f"Insufficient balance: need ${total_cost:.2f}, have ${current_balance:.2f}")
            return False
        
        # All checks passed, execute transaction
        client.multi()
        
        # Deduct from user balance
        new_balance = current_balance - total_cost
        client.set(f'user:{user_id}:balance', f'{new_balance:.2f}')
        
        # Update inventory
        for order in order_details:
            client.hset(
                f'product:{order["product_id"]}',
                'stock',
                str(order['remaining_stock'])
            )
        
        # Create order record
        order_id = f'order_{int(time.time() * 1000)}'
        client.set(f'order:{order_id}', json.dumps({
            'user_id': user_id,
            'items': order_details,
            'total_cost': total_cost,
            'timestamp': int(time.time())
        }))
        
        # Execute transaction
        result = client.execute()
        
        if result is not None:
            print(f"✅ Order {order_id} processed successfully!")
            print(f"   Total: ${total_cost:.2f}")
            print(f"   New balance: ${new_balance:.2f}")
            return True
        else:
            print("❌ Transaction aborted - data was modified during processing")
            return False
            
    except Exception as e:
        print(f"Error processing order: {e}")
        return False
        
    finally:
        client.unwatch()

# Test order processing
print("=== Order Processing Test ===")

# Successful order
print("\n1. Processing valid order...")
success1 = process_order('123', {
    'product_1': 2,  # 2 × $29.99 = $59.98
    'product_2': 1   # 1 × $49.99 = $49.99
})  # Total: $109.97

print(f"Order 1 result: {'Success' if success1 else 'Failed'}")

# Check remaining balance and inventory
print(f"Remaining balance: ${client.get('user:123:balance').decode()}")
print(f"Product 1 stock: {client.hget('product:product_1', 'stock').decode()}")
print(f"Product 2 stock: {client.hget('product:product_2', 'stock').decode()}")

# Order that exceeds balance
print("\n2. Processing order that exceeds balance...")
success2 = process_order('123', {
    'product_2': 3   # 3 × $49.99 = $149.97 (exceeds remaining balance)
})

# Order with insufficient stock
print("\n3. Processing order with insufficient stock...")
success3 = process_order('123', {
    'product_3': 1   # Out of stock
})

Pattern: Atomic Counters

import fakeredis
import threading
import time
from typing import Dict, Any

class AtomicCounters:
    def __init__(self, client: fakeredis.FakeRedis):
        self.client = client
        
    def increment(self, counter_name: str, amount: int = 1) -> int:
        """Atomically increment a counter"""
        return self.client.incrby(counter_name, amount)
    
    def decrement(self, counter_name: str, amount: int = 1) -> int:
        """Atomically decrement a counter"""
        return self.client.decrby(counter_name, amount)
    
    def increment_multiple(self, counters: Dict[str, int]) -> Dict[str, int]:
        """Atomically increment multiple counters"""
        with self.client.pipeline(transaction=True) as pipe:
            pipe.multi()
            
            for counter_name, amount in counters.items():
                pipe.incrby(counter_name, amount)
            
            results = pipe.execute()
            
            # Return new values
            return dict(zip(counters.keys(), results))
    
    def conditional_increment(self, counter_name: str, condition_key: str, expected_value: str, amount: int = 1) -> bool:
        """Increment counter only if condition key has expected value"""
        
        self.client.watch(condition_key, counter_name)
        
        try:
            # Check condition
            current_value = self.client.get(condition_key)
            if current_value is None or current_value.decode() != expected_value:
                return False
            
            # Condition met, increment counter
            self.client.multi()
            self.client.incrby(counter_name, amount)
            result = self.client.execute()
            
            return result is not None
            
        finally:
            self.client.unwatch()
    
    def get_counters(self, *counter_names: str) -> Dict[str, int]:
        """Get current values of multiple counters"""
        if not counter_names:
            return {}
            
        values = self.client.mget(counter_names)
        return {
            name: int(value.decode()) if value else 0 
            for name, value in zip(counter_names, values)
        }
    
    def reset_counter(self, counter_name: str) -> bool:
        """Reset counter to 0"""
        return self.client.set(counter_name, '0')

# Usage example
client = fakeredis.FakeRedis()
counters = AtomicCounters(client)

# Initialize counters
counters.reset_counter('page_views')
counters.reset_counter('user_signups')
counters.reset_counter('orders_processed')

def simulate_web_traffic(worker_id: int, duration: int):
    """Simulate web traffic that updates various counters"""
    start_time = time.time()
    
    while time.time() - start_time < duration:
        # Simulate page views
        counters.increment('page_views')
        
        # Occasionally simulate user signup
        if time.time() % 3 < 0.1:  # Roughly every 3 seconds
            counters.increment('user_signups')
        
        # Simulate order processing
        if time.time() % 5 < 0.1:  # Roughly every 5 seconds
            counters.increment('orders_processed')
        
        time.sleep(0.1)  # 100ms between actions
    
    print(f"Worker {worker_id} finished")

print("=== Atomic Counter Test ===")

# Start multiple workers to simulate concurrent traffic
workers = []
for i in range(3):
    worker = threading.Thread(target=simulate_web_traffic, args=(i, 2))  # 2 seconds each
    workers.append(worker)
    worker.start()

# Wait for workers to complete
for worker in workers:
    worker.join()

# Get final counter values
final_counts = counters.get_counters('page_views', 'user_signups', 'orders_processed')
print(f"Final counts: {final_counts}")

# Test bulk increment
print("\n=== Bulk Counter Update ===")
bulk_updates = {
    'daily_logins': 100,
    'api_calls': 500,
    'database_queries': 250
}

results = counters.increment_multiple(bulk_updates)
print(f"Bulk update results: {results}")

# Test conditional increment
print("\n=== Conditional Counter Test ===")
client.set('feature_flag', 'enabled')

# This should succeed
success1 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)
print(f"Conditional increment (enabled): {success1}")

# Change flag and try again
client.set('feature_flag', 'disabled')
success2 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)
print(f"Conditional increment (disabled): {success2}")

final_feature_usage = counters.get_counters('feature_usage')
print(f"Feature usage count: {final_feature_usage}")

Pattern: Distributed Lock

import fakeredis
import time
import threading
import uuid
from typing import Optional

class DistributedLock:
    def __init__(self, client: fakeredis.FakeRedis, key: str, timeout: int = 10):
        self.client = client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
        self.acquired = False
    
    def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool:
        """Acquire the distributed lock"""
        end_time = time.time() + (timeout or self.timeout)
        
        while True:
            # Try to acquire lock using SET with NX and EX
            acquired = self.client.set(
                self.key, 
                self.identifier, 
                nx=True,  # Only set if key doesn't exist
                ex=self.timeout  # Set expiration
            )
            
            if acquired:
                self.acquired = True
                return True
            
            if not blocking or time.time() >= end_time:
                return False
            
            time.sleep(0.01)  # Brief pause before retry
    
    def release(self) -> bool:
        """Release the distributed lock"""
        if not self.acquired:
            return False
        
        # Use Lua script to atomically check and delete
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.client.eval(lua_script, 1, self.key, self.identifier)
        if result:
            self.acquired = False
            return True
        return False
    
    def extend(self, additional_time: int) -> bool:
        """Extend lock timeout"""
        if not self.acquired:
            return False
        
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("expire", KEYS[1], ARGV[2])
        else
            return 0
        end
        """
        
        result = self.client.eval(lua_script, 1, self.key, self.identifier, additional_time)
        return bool(result)
    
    def __enter__(self):
        if not self.acquire():
            raise Exception(f"Could not acquire lock: {self.key}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

def critical_section_work(worker_id: int, shared_resource: str, client: fakeredis.FakeRedis):
    """Simulate work that requires exclusive access to a shared resource"""
    
    # Try to acquire lock for the shared resource
    lock = DistributedLock(client, shared_resource, timeout=5)
    
    try:
        print(f"Worker {worker_id}: Attempting to acquire lock for {shared_resource}")
        
        if lock.acquire(blocking=True, timeout=3.0):
            print(f"Worker {worker_id}: ✅ Acquired lock for {shared_resource}")
            
            # Get current value
            current_value = client.get(f"resource:{shared_resource}")
            current_count = int(current_value.decode()) if current_value else 0
            
            # Simulate some processing time
            time.sleep(0.5)
            
            # Update the resource
            new_count = current_count + 1
            client.set(f"resource:{shared_resource}", str(new_count))
            
            print(f"Worker {worker_id}: Updated {shared_resource} from {current_count} to {new_count}")
            
        else:
            print(f"Worker {worker_id}: ❌ Could not acquire lock for {shared_resource}")
            
    finally:
        if lock.acquired:
            lock.release()
            print(f"Worker {worker_id}: Released lock for {shared_resource}")

# Test distributed locking
client = fakeredis.FakeRedis()

# Initialize shared resource
client.set("resource:shared_counter", "0")

print("=== Distributed Lock Test ===")

# Create workers that compete for the same resource
workers = []
for i in range(5):
    worker = threading.Thread(
        target=critical_section_work,
        args=(i, "shared_counter", client)
    )
    workers.append(worker)

# Start all workers simultaneously
for worker in workers:
    worker.start()

# Wait for all workers
for worker in workers:
    worker.join()

# Check final value
final_value = client.get("resource:shared_counter").decode()
print(f"\nFinal shared counter value: {final_value}")

# Test lock context manager
print("\n=== Lock Context Manager Test ===")

def update_with_context_manager(resource_name: str):
    try:
        with DistributedLock(client, resource_name) as lock:
            print(f"Inside critical section for {resource_name}")
            
            # Get and increment counter
            current = client.get(f"resource:{resource_name}")
            count = int(current.decode()) if current else 0
            client.set(f"resource:{resource_name}", str(count + 10))
            
            print(f"Updated {resource_name} to {count + 10}")
            
    except Exception as e:
        print(f"Failed to acquire lock: {e}")

# Initialize new resource
client.set("resource:context_test", "0")

# Test context manager
update_with_context_manager("context_test")

final_context_value = client.get("resource:context_test").decode()
print(f"Final context test value: {final_context_value}")

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json