CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

pipelines-transactions.mddocs/

Pipelines and Transactions

Redis pipelines and transactions provide efficient batching of multiple commands and atomic execution guarantees. Pipelines reduce network round-trips by sending multiple commands at once, while transactions ensure atomicity with WATCH-based optimistic locking.

Capabilities

Pipeline Operations

Pipeline class for batching multiple Redis commands and executing them efficiently.

def pipeline(
    self,
    transaction: bool = True,
    shard_hint: Optional[str] = None
) -> "Pipeline": ...

class Pipeline:
    def execute(self, raise_on_error: bool = True) -> List[Any]: ...
    
    def reset(self) -> None: ...
    
    def watch(self, *names: KeyT) -> bool: ...
    
    def multi(self) -> "Pipeline": ...
    
    def discard(self) -> None: ...
    
    def __enter__(self) -> "Pipeline": ...
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

Transaction Operations

Redis transaction support with MULTI/EXEC and optimistic locking via WATCH.

def transaction(
    self,
    func: Callable[["Pipeline"], Any],
    *watches: KeyT,
    **kwargs
) -> Any: ...

def watch(self, *names: KeyT) -> bool: ...

def unwatch(self) -> bool: ...

def multi(self) -> bool: ...

def exec(self) -> Optional[List[Any]]: ...

def discard(self) -> bool: ...

Pipeline Command Methods

All Redis commands are available in pipeline mode for batching.

# String operations in pipeline
def set(self, name: KeyT, value: EncodableT, **kwargs) -> "Pipeline": ...
def get(self, name: KeyT) -> "Pipeline": ...
def mget(self, keys: List[KeyT], *args: KeyT) -> "Pipeline": ...
def mset(self, mapping: Dict[KeyT, EncodableT]) -> "Pipeline": ...

# Hash operations in pipeline  
def hset(self, name: KeyT, key: Optional[FieldT] = None, value: Optional[EncodableT] = None, mapping: Optional[Dict[FieldT, EncodableT]] = None) -> "Pipeline": ...
def hget(self, name: KeyT, key: FieldT) -> "Pipeline": ...
def hgetall(self, name: KeyT) -> "Pipeline": ...

# List operations in pipeline
def lpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
def rpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
def lpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...
def rpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...

# Set operations in pipeline
def sadd(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
def smembers(self, name: KeyT) -> "Pipeline": ...

# Sorted set operations in pipeline
def zadd(self, name: KeyT, mapping: Dict[EncodableT, float], **kwargs) -> "Pipeline": ...
def zrange(self, name: KeyT, start: int, end: int, **kwargs) -> "Pipeline": ...

# Key operations in pipeline
def delete(self, *names: KeyT) -> "Pipeline": ...
def exists(self, *names: KeyT) -> "Pipeline": ...
def expire(self, name: KeyT, time: ExpiryT) -> "Pipeline": ...

Usage Examples

Basic Pipeline Usage

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Create pipeline
pipe = r.pipeline()

# Queue multiple commands
pipe.set('user:1001', 'John')
pipe.set('user:1002', 'Jane') 
pipe.get('user:1001')
pipe.get('user:1002')
pipe.incr('page_views')

# Execute all commands at once
results = pipe.execute()
print(f"Results: {results}")  # [True, True, b'John', b'Jane', 1]

Pipeline with Context Manager

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Pipeline with automatic execution
with r.pipeline() as pipe:
    pipe.set('temp:key1', 'value1')
    pipe.set('temp:key2', 'value2')
    pipe.mget(['temp:key1', 'temp:key2'])
    results = pipe.execute()
    
print(f"Pipeline results: {results}")

Transaction with MULTI/EXEC

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Initialize counter
r.set('counter', 0)

# Transaction pipeline (default behavior)
pipe = r.pipeline(transaction=True)

try:
    # Queue commands in transaction
    pipe.multi()
    pipe.incr('counter')
    pipe.incr('counter') 
    pipe.get('counter')
    
    # Execute transaction atomically
    results = pipe.execute()
    print(f"Transaction results: {results}")  # [1, 2, b'2']
    
except redis.WatchError:
    print("Transaction aborted due to watched key modification")

Optimistic Locking with WATCH

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0)

# Initialize balance
r.set('account:balance', 1000)

def transfer_money(amount):
    """Transfer money using optimistic locking"""
    pipe = r.pipeline()
    
    while True:
        try:
            # Watch the balance key
            pipe.watch('account:balance')
            
            # Get current balance
            current_balance = int(r.get('account:balance') or 0)
            
            # Check if sufficient funds
            if current_balance < amount:
                pipe.unwatch()
                raise ValueError("Insufficient funds")
            
            # Calculate new balance
            new_balance = current_balance - amount
            
            # Start transaction
            pipe.multi()
            pipe.set('account:balance', new_balance)
            
            # Execute transaction
            pipe.execute()
            print(f"Transfer successful. New balance: {new_balance}")
            break
            
        except redis.WatchError:
            # Key was modified, retry
            print("Balance modified by another client, retrying...")
            continue

# Demonstrate concurrent transfers
transfer_money(100)
transfer_money(200)

High-Level Transaction Helper

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def update_user_profile(user_id, name, email):
    """Update user profile atomically"""
    def transaction_func(pipe):
        # Commands executed within transaction
        pipe.hset(f'user:{user_id}', 'name', name)
        pipe.hset(f'user:{user_id}', 'email', email)
        pipe.hset(f'user:{user_id}', 'updated_at', int(time.time()))
        pipe.sadd('updated_users', user_id)
    
    # Execute with automatic WATCH/MULTI/EXEC handling
    result = r.transaction(transaction_func, f'user:{user_id}')
    return result

# Update user profile
result = update_user_profile(1001, 'John Doe', 'john@example.com')
print(f"Profile update result: {result}")

Bulk Data Operations

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def bulk_insert_users(users):
    """Insert multiple users efficiently using pipeline"""
    pipe = r.pipeline()
    
    for user_id, user_data in users.items():
        # Hash for user profile
        pipe.hset(f'user:{user_id}', mapping=user_data)
        
        # Add to user index
        pipe.sadd('all_users', user_id)
        
        # Add to age-based index
        if 'age' in user_data:
            pipe.zadd('users_by_age', {user_id: int(user_data['age'])})
    
    # Execute all operations
    results = pipe.execute()
    return len([r for r in results if r])

# Bulk insert example
users_data = {
    1001: {'name': 'John', 'email': 'john@example.com', 'age': '30'},
    1002: {'name': 'Jane', 'email': 'jane@example.com', 'age': '25'},
    1003: {'name': 'Bob', 'email': 'bob@example.com', 'age': '35'},
}

successful_ops = bulk_insert_users(users_data)
print(f"Completed {successful_ops} operations")

Pipeline Error Handling

import redis
from redis.exceptions import ResponseError

r = redis.Redis(host='localhost', port=6379, db=0)

# Pipeline with error handling
pipe = r.pipeline()

# Mix of valid and invalid operations
pipe.set('valid_key', 'value')
pipe.lpush('valid_key', 'item')  # This will fail - wrong type
pipe.get('valid_key')
pipe.set('another_key', 'another_value')

try:
    results = pipe.execute(raise_on_error=True)
except ResponseError as e:
    print(f"Pipeline error: {e}")

# Handle errors without raising exceptions
results = pipe.execute(raise_on_error=False)
for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Command {i} failed: {result}")
    else:
        print(f"Command {i} result: {result}")

Complex Transaction Example

import redis
import json
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def create_order(user_id, product_id, quantity):
    """Create order with inventory check and update"""
    
    def order_transaction(pipe):
        # Get current inventory
        current_stock = pipe.get(f'inventory:{product_id}')
        current_stock = int(current_stock) if current_stock else 0
        
        # Check stock availability
        if current_stock < quantity:
            raise ValueError(f"Insufficient stock. Available: {current_stock}")
        
        # Generate order ID
        order_id = pipe.incr('order_counter')
        
        # Create order
        order_data = {
            'order_id': order_id,
            'user_id': user_id,
            'product_id': product_id,
            'quantity': quantity,
            'created_at': datetime.now().isoformat(),
            'status': 'pending'
        }
        
        # Update inventory
        new_stock = current_stock - quantity
        pipe.set(f'inventory:{product_id}', new_stock)
        
        # Store order
        pipe.hset(f'order:{order_id}', mapping=order_data)
        
        # Add to user's orders  
        pipe.sadd(f'user:{user_id}:orders', order_id)
        
        # Add to pending orders
        pipe.sadd('pending_orders', order_id)
        
        # Update product sales count
        pipe.incr(f'product:{product_id}:sales')
        
        return order_id
    
    # Watch inventory for consistency
    try:
        result = r.transaction(
            order_transaction, 
            f'inventory:{product_id}'
        )
        return result[0]  # Return order_id
        
    except redis.WatchError:
        raise Exception("Order failed due to concurrent inventory update")

# Initialize test data
r.set('inventory:123', 10)
r.set('order_counter', 1000)

# Create orders
try:
    order_id = create_order(user_id=1001, product_id=123, quantity=2)
    print(f"Order created successfully: {order_id}")
    
    # Check updated inventory
    remaining_stock = r.get('inventory:123')
    print(f"Remaining stock: {remaining_stock}")
    
except Exception as e:
    print(f"Order creation failed: {e}")

Non-Transactional Pipeline

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Non-transactional pipeline for better performance
pipe = r.pipeline(transaction=False)

# Batch read operations
keys_to_check = ['user:1001', 'user:1002', 'user:1003', 'user:1004']

for key in keys_to_check:
    pipe.exists(key)
    pipe.hgetall(key)

# Execute all at once
results = pipe.execute()

# Process results (exists, hgetall pairs)
for i in range(0, len(results), 2):
    key = keys_to_check[i // 2]
    exists = results[i]
    data = results[i + 1]
    
    if exists:
        print(f"{key}: {data}")
    else:
        print(f"{key}: Not found")

Pipeline Reset and Reuse

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

# Create reusable pipeline
pipe = r.pipeline()

# First batch of operations
pipe.set('batch1_key1', 'value1')
pipe.set('batch1_key2', 'value2')
batch1_results = pipe.execute()
print(f"Batch 1: {batch1_results}")

# Reset pipeline for reuse
pipe.reset()

# Second batch of operations
pipe.get('batch1_key1')
pipe.get('batch1_key2') 
pipe.set('batch2_key', 'value')
batch2_results = pipe.execute()
print(f"Batch 2: {batch2_results}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json