Python client for Redis database and key-value store
—
Redis pipelines and transactions provide efficient batching of multiple commands and atomic execution guarantees. Pipelines reduce network round-trips by sending multiple commands at once, while transactions ensure atomicity with WATCH-based optimistic locking.
Pipeline class for batching multiple Redis commands and executing them efficiently.
def pipeline(
self,
transaction: bool = True,
shard_hint: Optional[str] = None
) -> "Pipeline": ...
class Pipeline:
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
def reset(self) -> None: ...
def watch(self, *names: KeyT) -> bool: ...
def multi(self) -> "Pipeline": ...
def discard(self) -> None: ...
def __enter__(self) -> "Pipeline": ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...Redis transaction support with MULTI/EXEC and optimistic locking via WATCH.
def transaction(
self,
func: Callable[["Pipeline"], Any],
*watches: KeyT,
**kwargs
) -> Any: ...
def watch(self, *names: KeyT) -> bool: ...
def unwatch(self) -> bool: ...
def multi(self) -> bool: ...
def exec(self) -> Optional[List[Any]]: ...
def discard(self) -> bool: ...All Redis commands are available in pipeline mode for batching.
# String operations in pipeline
def set(self, name: KeyT, value: EncodableT, **kwargs) -> "Pipeline": ...
def get(self, name: KeyT) -> "Pipeline": ...
def mget(self, keys: List[KeyT], *args: KeyT) -> "Pipeline": ...
def mset(self, mapping: Dict[KeyT, EncodableT]) -> "Pipeline": ...
# Hash operations in pipeline
def hset(self, name: KeyT, key: Optional[FieldT] = None, value: Optional[EncodableT] = None, mapping: Optional[Dict[FieldT, EncodableT]] = None) -> "Pipeline": ...
def hget(self, name: KeyT, key: FieldT) -> "Pipeline": ...
def hgetall(self, name: KeyT) -> "Pipeline": ...
# List operations in pipeline
def lpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
def rpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
def lpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...
def rpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...
# Set operations in pipeline
def sadd(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
def smembers(self, name: KeyT) -> "Pipeline": ...
# Sorted set operations in pipeline
def zadd(self, name: KeyT, mapping: Dict[EncodableT, float], **kwargs) -> "Pipeline": ...
def zrange(self, name: KeyT, start: int, end: int, **kwargs) -> "Pipeline": ...
# Key operations in pipeline
def delete(self, *names: KeyT) -> "Pipeline": ...
def exists(self, *names: KeyT) -> "Pipeline": ...
def expire(self, name: KeyT, time: ExpiryT) -> "Pipeline": ...import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Create pipeline
pipe = r.pipeline()
# Queue multiple commands
pipe.set('user:1001', 'John')
pipe.set('user:1002', 'Jane')
pipe.get('user:1001')
pipe.get('user:1002')
pipe.incr('page_views')
# Execute all commands at once
results = pipe.execute()
print(f"Results: {results}") # [True, True, b'John', b'Jane', 1]import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Pipeline with automatic execution
with r.pipeline() as pipe:
pipe.set('temp:key1', 'value1')
pipe.set('temp:key2', 'value2')
pipe.mget(['temp:key1', 'temp:key2'])
results = pipe.execute()
print(f"Pipeline results: {results}")import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Initialize counter
r.set('counter', 0)
# Transaction pipeline (default behavior)
pipe = r.pipeline(transaction=True)
try:
# Queue commands in transaction
pipe.multi()
pipe.incr('counter')
pipe.incr('counter')
pipe.get('counter')
# Execute transaction atomically
results = pipe.execute()
print(f"Transaction results: {results}") # [1, 2, b'2']
except redis.WatchError:
print("Transaction aborted due to watched key modification")import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
# Initialize balance
r.set('account:balance', 1000)
def transfer_money(amount):
"""Transfer money using optimistic locking"""
pipe = r.pipeline()
while True:
try:
# Watch the balance key
pipe.watch('account:balance')
# Get current balance
current_balance = int(r.get('account:balance') or 0)
# Check if sufficient funds
if current_balance < amount:
pipe.unwatch()
raise ValueError("Insufficient funds")
# Calculate new balance
new_balance = current_balance - amount
# Start transaction
pipe.multi()
pipe.set('account:balance', new_balance)
# Execute transaction
pipe.execute()
print(f"Transfer successful. New balance: {new_balance}")
break
except redis.WatchError:
# Key was modified, retry
print("Balance modified by another client, retrying...")
continue
# Demonstrate concurrent transfers
transfer_money(100)
transfer_money(200)import redis
r = redis.Redis(host='localhost', port=6379, db=0)
def update_user_profile(user_id, name, email):
"""Update user profile atomically"""
def transaction_func(pipe):
# Commands executed within transaction
pipe.hset(f'user:{user_id}', 'name', name)
pipe.hset(f'user:{user_id}', 'email', email)
pipe.hset(f'user:{user_id}', 'updated_at', int(time.time()))
pipe.sadd('updated_users', user_id)
# Execute with automatic WATCH/MULTI/EXEC handling
result = r.transaction(transaction_func, f'user:{user_id}')
return result
# Update user profile
result = update_user_profile(1001, 'John Doe', 'john@example.com')
print(f"Profile update result: {result}")import redis
r = redis.Redis(host='localhost', port=6379, db=0)
def bulk_insert_users(users):
"""Insert multiple users efficiently using pipeline"""
pipe = r.pipeline()
for user_id, user_data in users.items():
# Hash for user profile
pipe.hset(f'user:{user_id}', mapping=user_data)
# Add to user index
pipe.sadd('all_users', user_id)
# Add to age-based index
if 'age' in user_data:
pipe.zadd('users_by_age', {user_id: int(user_data['age'])})
# Execute all operations
results = pipe.execute()
return len([r for r in results if r])
# Bulk insert example
users_data = {
1001: {'name': 'John', 'email': 'john@example.com', 'age': '30'},
1002: {'name': 'Jane', 'email': 'jane@example.com', 'age': '25'},
1003: {'name': 'Bob', 'email': 'bob@example.com', 'age': '35'},
}
successful_ops = bulk_insert_users(users_data)
print(f"Completed {successful_ops} operations")import redis
from redis.exceptions import ResponseError
r = redis.Redis(host='localhost', port=6379, db=0)
# Pipeline with error handling
pipe = r.pipeline()
# Mix of valid and invalid operations
pipe.set('valid_key', 'value')
pipe.lpush('valid_key', 'item') # This will fail - wrong type
pipe.get('valid_key')
pipe.set('another_key', 'another_value')
try:
results = pipe.execute(raise_on_error=True)
except ResponseError as e:
print(f"Pipeline error: {e}")
# Handle errors without raising exceptions
results = pipe.execute(raise_on_error=False)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Command {i} failed: {result}")
else:
print(f"Command {i} result: {result}")import redis
import json
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def create_order(user_id, product_id, quantity):
"""Create order with inventory check and update"""
def order_transaction(pipe):
# Get current inventory
current_stock = pipe.get(f'inventory:{product_id}')
current_stock = int(current_stock) if current_stock else 0
# Check stock availability
if current_stock < quantity:
raise ValueError(f"Insufficient stock. Available: {current_stock}")
# Generate order ID
order_id = pipe.incr('order_counter')
# Create order
order_data = {
'order_id': order_id,
'user_id': user_id,
'product_id': product_id,
'quantity': quantity,
'created_at': datetime.now().isoformat(),
'status': 'pending'
}
# Update inventory
new_stock = current_stock - quantity
pipe.set(f'inventory:{product_id}', new_stock)
# Store order
pipe.hset(f'order:{order_id}', mapping=order_data)
# Add to user's orders
pipe.sadd(f'user:{user_id}:orders', order_id)
# Add to pending orders
pipe.sadd('pending_orders', order_id)
# Update product sales count
pipe.incr(f'product:{product_id}:sales')
return order_id
# Watch inventory for consistency
try:
result = r.transaction(
order_transaction,
f'inventory:{product_id}'
)
return result[0] # Return order_id
except redis.WatchError:
raise Exception("Order failed due to concurrent inventory update")
# Initialize test data
r.set('inventory:123', 10)
r.set('order_counter', 1000)
# Create orders
try:
order_id = create_order(user_id=1001, product_id=123, quantity=2)
print(f"Order created successfully: {order_id}")
# Check updated inventory
remaining_stock = r.get('inventory:123')
print(f"Remaining stock: {remaining_stock}")
except Exception as e:
print(f"Order creation failed: {e}")import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Non-transactional pipeline for better performance
pipe = r.pipeline(transaction=False)
# Batch read operations
keys_to_check = ['user:1001', 'user:1002', 'user:1003', 'user:1004']
for key in keys_to_check:
pipe.exists(key)
pipe.hgetall(key)
# Execute all at once
results = pipe.execute()
# Process results (exists, hgetall pairs)
for i in range(0, len(results), 2):
key = keys_to_check[i // 2]
exists = results[i]
data = results[i + 1]
if exists:
print(f"{key}: {data}")
else:
print(f"{key}: Not found")import redis
r = redis.Redis(host='localhost', port=6379, db=0)
# Create reusable pipeline
pipe = r.pipeline()
# First batch of operations
pipe.set('batch1_key1', 'value1')
pipe.set('batch1_key2', 'value2')
batch1_results = pipe.execute()
print(f"Batch 1: {batch1_results}")
# Reset pipeline for reuse
pipe.reset()
# Second batch of operations
pipe.get('batch1_key1')
pipe.get('batch1_key2')
pipe.set('batch2_key', 'value')
batch2_results = pipe.execute()
print(f"Batch 2: {batch2_results}")Install with Tessl CLI
npx tessl i tessl/pypi-redis