Python implementation of redis API, can be used for testing purposes
—
Redis 5.0+ streams for append-only log data structures with consumer group support for distributed processing. Streams provide a powerful abstraction for event sourcing, message queuing, and real-time data processing with built-in persistence and horizontal scaling capabilities.
Core operations for creating, adding to, and querying Redis streams.
def xadd(
self,
name: KeyT,
fields: Dict[AnyKeyT, EncodableT],
id: str = "*",
maxlen: Optional[int] = None,
approximate: bool = True,
nomkstream: bool = False,
minid: Optional[str] = None,
limit: Optional[int] = None
) -> str: ...
def xlen(self, name: KeyT) -> int: ...
def xrange(
self,
name: KeyT,
min: str = "-",
max: str = "+",
count: Optional[int] = None
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...
def xrevrange(
self,
name: KeyT,
max: str = "+",
min: str = "-",
count: Optional[int] = None
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...
def xdel(self, name: KeyT, *ids: str) -> int: ...
def xtrim(
self,
name: KeyT,
maxlen: Optional[int] = None,
approximate: bool = True,
minid: Optional[str] = None,
limit: Optional[int] = None
) -> int: ...Reading operations for consuming stream entries with blocking and non-blocking modes.
def xread(
self,
streams: Dict[KeyT, Union[str, int]],
count: Optional[int] = None,
block: Optional[int] = None
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...
def xreadgroup(
self,
groupname: str,
consumername: str,
streams: Dict[KeyT, Union[str, int]],
count: Optional[int] = None,
block: Optional[int] = None,
noack: bool = False
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...Operations for managing consumer groups and tracking message processing.
def xgroup_create(
self,
name: KeyT,
groupname: str,
id: str = "$",
mkstream: bool = False,
entries_read: Optional[int] = None
) -> bool: ...
def xgroup_destroy(self, name: KeyT, groupname: str) -> bool: ...
def xgroup_createconsumer(
self,
name: KeyT,
groupname: str,
consumername: str
) -> bool: ...
def xgroup_delconsumer(
self,
name: KeyT,
groupname: str,
consumername: str
) -> int: ...
def xgroup_setid(
self,
name: KeyT,
groupname: str,
id: str,
entries_read: Optional[int] = None
) -> bool: ...Functions for acknowledging processed messages and managing pending entries.
def xack(self, name: KeyT, groupname: str, *ids: str) -> int: ...
def xpending(
self,
name: KeyT,
groupname: str,
min: Optional[str] = None,
max: Optional[str] = None,
count: Optional[int] = None,
consumername: Optional[str] = None
) -> Union[Dict[str, Any], List[Dict[str, Any]]]: ...
def xclaim(
self,
name: KeyT,
groupname: str,
consumername: str,
min_idle_time: int,
message_ids: List[str],
idle: Optional[int] = None,
time: Optional[int] = None,
retrycount: Optional[int] = None,
force: bool = False,
justid: bool = False
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...
def xautoclaim(
self,
name: KeyT,
groupname: str,
consumername: str,
min_idle_time: int,
start_id: str = "0-0",
count: Optional[int] = None,
justid: bool = False
) -> Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]], List[bytes]]: ...Inspection commands for stream metadata, consumer group status, and consumer information.
def xinfo_consumers(self, name: KeyT, groupname: str) -> List[Dict[str, Any]]: ...
def xinfo_groups(self, name: KeyT) -> List[Dict[str, Any]]: ...
def xinfo_stream(self, name: KeyT, full: bool = False, count: Optional[int] = None) -> Dict[str, Any]: ...import fakeredis
client = fakeredis.FakeRedis()
# Add entries to a stream
entry_id1 = client.xadd('events', {'user': 'alice', 'action': 'login'})
print(f"Added entry: {entry_id1}")
entry_id2 = client.xadd('events', {'user': 'bob', 'action': 'purchase', 'amount': '29.99'})
print(f"Added entry: {entry_id2}")
# Add with custom ID
custom_id = client.xadd('events', {'user': 'charlie', 'action': 'logout'}, id='1234567890123-0')
# Get stream length
length = client.xlen('events')
print(f"Stream length: {length}")import fakeredis
client = fakeredis.FakeRedis()
# Add some test data
client.xadd('sensor_data', {'temperature': '23.5', 'humidity': '65'})
client.xadd('sensor_data', {'temperature': '24.1', 'humidity': '62'})
client.xadd('sensor_data', {'temperature': '23.8', 'humidity': '68'})
# Read all entries
entries = client.xrange('sensor_data')
for entry_id, fields in entries:
print(f"ID: {entry_id.decode()}")
for key, value in fields.items():
print(f" {key.decode()}: {value.decode()}")
# Read entries in reverse order
recent_entries = client.xrevrange('sensor_data', count=2)
print(f"Last 2 entries: {len(recent_entries)}")
# Read from a specific ID
from_id = entries[0][0].decode() # First entry ID
new_entries = client.xrange('sensor_data', min=from_id)import fakeredis
client = fakeredis.FakeRedis()
# Add many entries
for i in range(100):
client.xadd('logs', {'level': 'info', 'message': f'Log entry {i}'})
print(f"Stream length before trim: {client.xlen('logs')}")
# Keep only the latest 50 entries (approximate)
trimmed = client.xtrim('logs', maxlen=50, approximate=True)
print(f"Trimmed {trimmed} entries")
print(f"Stream length after trim: {client.xlen('logs')}")
# Delete specific entries
entries = client.xrange('logs', count=5)
entry_ids = [entry[0].decode() for entry in entries]
deleted = client.xdel('logs', *entry_ids)
print(f"Deleted {deleted} specific entries")import fakeredis
import time
client = fakeredis.FakeRedis()
# Create a stream and add some data
for i in range(10):
client.xadd('orders', {'order_id': f'order_{i}', 'status': 'pending'})
# Create consumer group
client.xgroup_create('orders', 'processors', id='0')
# Create consumers in the group
client.xgroup_createconsumer('orders', 'processors', 'worker1')
client.xgroup_createconsumer('orders', 'processors', 'worker2')
# Consumer 1 reads messages
messages = client.xreadgroup('processors', 'worker1', {'orders': '>'}, count=3)
print("Worker1 received:")
for stream_name, entries in messages:
for entry_id, fields in entries:
print(f" {entry_id.decode()}: {fields}")
# Consumer 2 reads different messages
messages = client.xreadgroup('processors', 'worker2', {'orders': '>'}, count=2)
print("Worker2 received:")
for stream_name, entries in messages:
for entry_id, fields in entries:
print(f" {entry_id.decode()}: {fields}")import fakeredis
client = fakeredis.FakeRedis()
# Setup stream and consumer group
client.xadd('tasks', {'task': 'send_email', 'recipient': 'user@example.com'})
client.xadd('tasks', {'task': 'process_payment', 'amount': '100.00'})
client.xadd('tasks', {'task': 'update_inventory', 'item_id': '12345'})
client.xgroup_create('tasks', 'workers', id='0')
# Read messages without acknowledging
messages = client.xreadgroup('workers', 'consumer1', {'tasks': '>'})
entry_ids = []
for stream_name, entries in messages:
for entry_id, fields in entries:
entry_ids.append(entry_id.decode())
print(f"Processing: {fields}")
# Check pending messages
pending_info = client.xpending('tasks', 'workers')
print(f"Pending messages: {pending_info}")
# Acknowledge processed messages
acked = client.xack('tasks', 'workers', *entry_ids[:2]) # Ack first 2 messages
print(f"Acknowledged {acked} messages")
# Check pending again
pending_info = client.xpending('tasks', 'workers')
print(f"Remaining pending: {pending_info}")import fakeredis
import threading
import time
client = fakeredis.FakeRedis()
def producer():
"""Producer thread that adds entries every 2 seconds"""
for i in range(5):
time.sleep(2)
entry_id = client.xadd('notifications', {
'type': 'alert',
'message': f'Alert {i}',
'timestamp': str(int(time.time()))
})
print(f"Producer added: {entry_id}")
def consumer():
"""Consumer that blocks waiting for new entries"""
last_id = '0-0'
while True:
# Block for up to 5 seconds waiting for new messages
messages = client.xread({'notifications': last_id}, block=5000)
if not messages:
print("No new messages, continuing...")
break
for stream_name, entries in messages:
for entry_id, fields in entries:
print(f"Consumer received: {entry_id.decode()} - {fields}")
last_id = entry_id.decode()
# Start producer thread
producer_thread = threading.Thread(target=producer)
producer_thread.start()
# Start consuming (will block)
consumer()
producer_thread.join()import fakeredis
client = fakeredis.FakeRedis()
# Setup test data
client.xadd('analytics', {'event': 'page_view', 'page': '/home'})
client.xadd('analytics', {'event': 'click', 'element': 'button'})
client.xgroup_create('analytics', 'processors', id='0')
client.xreadgroup('processors', 'worker1', {'analytics': '>'})
# Get stream information
stream_info = client.xinfo_stream('analytics')
print("Stream info:")
print(f" Length: {stream_info['length']}")
print(f" First entry: {stream_info['first-entry']}")
print(f" Last entry: {stream_info['last-entry']}")
# Get consumer group information
groups_info = client.xinfo_groups('analytics')
print("\nConsumer groups:")
for group in groups_info:
print(f" Group: {group['name'].decode()}")
print(f" Consumers: {group['consumers']}")
print(f" Pending: {group['pending']}")
# Get consumer information
consumers_info = client.xinfo_consumers('analytics', 'processors')
print("\nConsumers in group:")
for consumer in consumers_info:
print(f" Consumer: {consumer['name'].decode()}")
print(f" Pending: {consumer['pending']}")
print(f" Idle: {consumer['idle']}")import fakeredis
import json
import time
from datetime import datetime
class EventStore:
def __init__(self, client):
self.client = client
def append_event(self, aggregate_id, event_type, event_data):
"""Append an event to an aggregate's stream"""
stream_name = f"aggregate:{aggregate_id}"
event = {
'event_type': event_type,
'event_data': json.dumps(event_data),
'timestamp': datetime.utcnow().isoformat(),
'version': str(int(time.time() * 1000000)) # Microsecond precision
}
return self.client.xadd(stream_name, event)
def get_events(self, aggregate_id, from_version=None):
"""Retrieve all events for an aggregate"""
stream_name = f"aggregate:{aggregate_id}"
min_id = from_version if from_version else '-'
events = []
entries = self.client.xrange(stream_name, min=min_id)
for entry_id, fields in entries:
event = {
'id': entry_id.decode(),
'event_type': fields[b'event_type'].decode(),
'event_data': json.loads(fields[b'event_data'].decode()),
'timestamp': fields[b'timestamp'].decode(),
'version': fields[b'version'].decode()
}
events.append(event)
return events
# Usage example
client = fakeredis.FakeRedis()
event_store = EventStore(client)
# Append events for a user aggregate
user_id = "user123"
event_store.append_event(user_id, "UserCreated", {"name": "Alice", "email": "alice@example.com"})
event_store.append_event(user_id, "EmailChanged", {"old_email": "alice@example.com", "new_email": "alice.smith@example.com"})
event_store.append_event(user_id, "ProfileUpdated", {"field": "age", "value": 30})
# Retrieve event history
events = event_store.get_events(user_id)
print(f"Events for {user_id}:")
for event in events:
print(f" {event['event_type']}: {event['event_data']}")import fakeredis
import time
import json
class StreamMessageQueue:
def __init__(self, client, queue_name, consumer_group):
self.client = client
self.queue_name = queue_name
self.consumer_group = consumer_group
self.dead_letter_queue = f"{queue_name}:dlq"
# Create consumer group if it doesn't exist
try:
self.client.xgroup_create(queue_name, consumer_group, id='0', mkstream=True)
except:
pass # Group already exists
def enqueue(self, message_data, priority=0):
"""Add a message to the queue"""
message = {
'data': json.dumps(message_data),
'priority': str(priority),
'enqueued_at': str(int(time.time())),
'retry_count': '0'
}
return self.client.xadd(self.queue_name, message)
def dequeue(self, consumer_name, count=1, block_ms=1000):
"""Dequeue messages for processing"""
messages = self.client.xreadgroup(
self.consumer_group,
consumer_name,
{self.queue_name: '>'},
count=count,
block=block_ms
)
processed_messages = []
for stream_name, entries in messages:
for entry_id, fields in entries:
message = {
'id': entry_id.decode(),
'data': json.loads(fields[b'data'].decode()),
'priority': int(fields[b'priority'].decode()),
'enqueued_at': int(fields[b'enqueued_at'].decode()),
'retry_count': int(fields[b'retry_count'].decode())
}
processed_messages.append(message)
return processed_messages
def acknowledge(self, message_id):
"""Acknowledge successful processing"""
return self.client.xack(self.queue_name, self.consumer_group, message_id)
def retry_failed_messages(self, max_retries=3, idle_time_ms=60000):
"""Move failed messages to retry or dead letter queue"""
# Get pending messages that are idle
pending = self.client.xpending(
self.queue_name,
self.consumer_group,
min='-',
max='+',
count=100
)
current_time = int(time.time() * 1000)
for msg_info in pending:
if isinstance(msg_info, dict):
msg_id = msg_info['message_id'].decode()
idle = msg_info['time_since_delivered']
consumer = msg_info['consumer'].decode()
if idle > idle_time_ms: # Message is idle too long
# Claim the message
claimed = self.client.xclaim(
self.queue_name,
self.consumer_group,
'retry_handler',
idle_time_ms,
[msg_id]
)
if claimed:
entry_id, fields = claimed[0]
retry_count = int(fields[b'retry_count'].decode())
if retry_count >= max_retries:
# Move to dead letter queue
self.client.xadd(self.dead_letter_queue, {
'original_id': msg_id,
'data': fields[b'data'].decode(),
'failed_at': str(current_time),
'retry_count': str(retry_count)
})
self.acknowledge(msg_id)
else:
# Increment retry count and re-queue
fields[b'retry_count'] = str(retry_count + 1).encode()
self.client.xadd(self.queue_name, {
k.decode(): v.decode() for k, v in fields.items()
})
self.acknowledge(msg_id)
# Usage example
client = fakeredis.FakeRedis()
queue = StreamMessageQueue(client, 'work_queue', 'workers')
# Enqueue some work
queue.enqueue({'task': 'send_email', 'recipient': 'user@example.com'})
queue.enqueue({'task': 'process_payment', 'amount': 100}, priority=1)
# Process messages
messages = queue.dequeue('worker1')
for message in messages:
try:
# Simulate processing
print(f"Processing: {message['data']}")
# Acknowledge on success
queue.acknowledge(message['id'])
except Exception as e:
print(f"Failed to process {message['id']}: {e}")
# Don't acknowledge - will be retried laterInstall with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs