CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

stream-operations.mddocs/

Stream Operations

Redis 5.0+ streams for append-only log data structures with consumer group support for distributed processing. Streams provide a powerful abstraction for event sourcing, message queuing, and real-time data processing with built-in persistence and horizontal scaling capabilities.

Capabilities

Stream Management

Core operations for creating, adding to, and querying Redis streams.

def xadd(
    self,
    name: KeyT,
    fields: Dict[AnyKeyT, EncodableT],
    id: str = "*",
    maxlen: Optional[int] = None,
    approximate: bool = True,
    nomkstream: bool = False,
    minid: Optional[str] = None,
    limit: Optional[int] = None
) -> str: ...

def xlen(self, name: KeyT) -> int: ...

def xrange(
    self,
    name: KeyT,
    min: str = "-",
    max: str = "+",
    count: Optional[int] = None
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...

def xrevrange(
    self,
    name: KeyT,
    max: str = "+",
    min: str = "-",
    count: Optional[int] = None
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...

def xdel(self, name: KeyT, *ids: str) -> int: ...

def xtrim(
    self,
    name: KeyT,
    maxlen: Optional[int] = None,
    approximate: bool = True,
    minid: Optional[str] = None,
    limit: Optional[int] = None
) -> int: ...

Stream Reading

Reading operations for consuming stream entries with blocking and non-blocking modes.

def xread(
    self,
    streams: Dict[KeyT, Union[str, int]],
    count: Optional[int] = None,
    block: Optional[int] = None
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...

def xreadgroup(
    self,
    groupname: str,
    consumername: str,
    streams: Dict[KeyT, Union[str, int]],
    count: Optional[int] = None,
    block: Optional[int] = None,
    noack: bool = False
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...

Consumer Groups

Operations for managing consumer groups and tracking message processing.

def xgroup_create(
    self,
    name: KeyT,
    groupname: str,
    id: str = "$",
    mkstream: bool = False,
    entries_read: Optional[int] = None
) -> bool: ...

def xgroup_destroy(self, name: KeyT, groupname: str) -> bool: ...

def xgroup_createconsumer(
    self,
    name: KeyT,
    groupname: str,
    consumername: str
) -> bool: ...

def xgroup_delconsumer(
    self,
    name: KeyT,
    groupname: str,
    consumername: str
) -> int: ...

def xgroup_setid(
    self,
    name: KeyT,
    groupname: str,
    id: str,
    entries_read: Optional[int] = None
) -> bool: ...

Message Acknowledgment

Functions for acknowledging processed messages and managing pending entries.

def xack(self, name: KeyT, groupname: str, *ids: str) -> int: ...

def xpending(
    self,
    name: KeyT,
    groupname: str,
    min: Optional[str] = None,
    max: Optional[str] = None,
    count: Optional[int] = None,
    consumername: Optional[str] = None
) -> Union[Dict[str, Any], List[Dict[str, Any]]]: ...

def xclaim(
    self,
    name: KeyT,
    groupname: str,
    consumername: str,
    min_idle_time: int,
    message_ids: List[str],
    idle: Optional[int] = None,
    time: Optional[int] = None,
    retrycount: Optional[int] = None,
    force: bool = False,
    justid: bool = False
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...

def xautoclaim(
    self,
    name: KeyT,
    groupname: str,
    consumername: str,
    min_idle_time: int,
    start_id: str = "0-0",
    count: Optional[int] = None,
    justid: bool = False
) -> Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]], List[bytes]]: ...

Stream Information

Inspection commands for stream metadata, consumer group status, and consumer information.

def xinfo_consumers(self, name: KeyT, groupname: str) -> List[Dict[str, Any]]: ...

def xinfo_groups(self, name: KeyT) -> List[Dict[str, Any]]: ...

def xinfo_stream(self, name: KeyT, full: bool = False, count: Optional[int] = None) -> Dict[str, Any]: ...

Usage Examples

Basic Stream Operations

import fakeredis

client = fakeredis.FakeRedis()

# Add entries to a stream
entry_id1 = client.xadd('events', {'user': 'alice', 'action': 'login'})
print(f"Added entry: {entry_id1}")

entry_id2 = client.xadd('events', {'user': 'bob', 'action': 'purchase', 'amount': '29.99'})
print(f"Added entry: {entry_id2}")

# Add with custom ID
custom_id = client.xadd('events', {'user': 'charlie', 'action': 'logout'}, id='1234567890123-0')

# Get stream length
length = client.xlen('events')
print(f"Stream length: {length}")

Reading Stream Data

import fakeredis

client = fakeredis.FakeRedis()

# Add some test data
client.xadd('sensor_data', {'temperature': '23.5', 'humidity': '65'})
client.xadd('sensor_data', {'temperature': '24.1', 'humidity': '62'})
client.xadd('sensor_data', {'temperature': '23.8', 'humidity': '68'})

# Read all entries
entries = client.xrange('sensor_data')
for entry_id, fields in entries:
    print(f"ID: {entry_id.decode()}")
    for key, value in fields.items():
        print(f"  {key.decode()}: {value.decode()}")

# Read entries in reverse order
recent_entries = client.xrevrange('sensor_data', count=2)
print(f"Last 2 entries: {len(recent_entries)}")

# Read from a specific ID
from_id = entries[0][0].decode()  # First entry ID
new_entries = client.xrange('sensor_data', min=from_id)

Stream Trimming and Cleanup

import fakeredis

client = fakeredis.FakeRedis()

# Add many entries
for i in range(100):
    client.xadd('logs', {'level': 'info', 'message': f'Log entry {i}'})

print(f"Stream length before trim: {client.xlen('logs')}")

# Keep only the latest 50 entries (approximate)
trimmed = client.xtrim('logs', maxlen=50, approximate=True)
print(f"Trimmed {trimmed} entries")
print(f"Stream length after trim: {client.xlen('logs')}")

# Delete specific entries
entries = client.xrange('logs', count=5)
entry_ids = [entry[0].decode() for entry in entries]
deleted = client.xdel('logs', *entry_ids)
print(f"Deleted {deleted} specific entries")

Consumer Groups

import fakeredis
import time

client = fakeredis.FakeRedis()

# Create a stream and add some data
for i in range(10):
    client.xadd('orders', {'order_id': f'order_{i}', 'status': 'pending'})

# Create consumer group
client.xgroup_create('orders', 'processors', id='0')

# Create consumers in the group
client.xgroup_createconsumer('orders', 'processors', 'worker1')
client.xgroup_createconsumer('orders', 'processors', 'worker2')

# Consumer 1 reads messages
messages = client.xreadgroup('processors', 'worker1', {'orders': '>'}, count=3)
print("Worker1 received:")
for stream_name, entries in messages:
    for entry_id, fields in entries:
        print(f"  {entry_id.decode()}: {fields}")

# Consumer 2 reads different messages
messages = client.xreadgroup('processors', 'worker2', {'orders': '>'}, count=2)
print("Worker2 received:")
for stream_name, entries in messages:
    for entry_id, fields in entries:
        print(f"  {entry_id.decode()}: {fields}")

Message Acknowledgment and Pending Entries

import fakeredis

client = fakeredis.FakeRedis()

# Setup stream and consumer group
client.xadd('tasks', {'task': 'send_email', 'recipient': 'user@example.com'})
client.xadd('tasks', {'task': 'process_payment', 'amount': '100.00'})
client.xadd('tasks', {'task': 'update_inventory', 'item_id': '12345'})

client.xgroup_create('tasks', 'workers', id='0')

# Read messages without acknowledging
messages = client.xreadgroup('workers', 'consumer1', {'tasks': '>'})
entry_ids = []
for stream_name, entries in messages:
    for entry_id, fields in entries:
        entry_ids.append(entry_id.decode())
        print(f"Processing: {fields}")

# Check pending messages
pending_info = client.xpending('tasks', 'workers')
print(f"Pending messages: {pending_info}")

# Acknowledge processed messages
acked = client.xack('tasks', 'workers', *entry_ids[:2])  # Ack first 2 messages
print(f"Acknowledged {acked} messages")

# Check pending again
pending_info = client.xpending('tasks', 'workers')
print(f"Remaining pending: {pending_info}")

Blocking Stream Reads

import fakeredis
import threading
import time

client = fakeredis.FakeRedis()

def producer():
    """Producer thread that adds entries every 2 seconds"""
    for i in range(5):
        time.sleep(2)
        entry_id = client.xadd('notifications', {
            'type': 'alert',
            'message': f'Alert {i}',
            'timestamp': str(int(time.time()))
        })
        print(f"Producer added: {entry_id}")

def consumer():
    """Consumer that blocks waiting for new entries"""
    last_id = '0-0'
    while True:
        # Block for up to 5 seconds waiting for new messages
        messages = client.xread({'notifications': last_id}, block=5000)
        if not messages:
            print("No new messages, continuing...")
            break
            
        for stream_name, entries in messages:
            for entry_id, fields in entries:
                print(f"Consumer received: {entry_id.decode()} - {fields}")
                last_id = entry_id.decode()

# Start producer thread
producer_thread = threading.Thread(target=producer)
producer_thread.start()

# Start consuming (will block)
consumer()

producer_thread.join()

Stream Information and Monitoring

import fakeredis

client = fakeredis.FakeRedis()

# Setup test data
client.xadd('analytics', {'event': 'page_view', 'page': '/home'})
client.xadd('analytics', {'event': 'click', 'element': 'button'})
client.xgroup_create('analytics', 'processors', id='0')
client.xreadgroup('processors', 'worker1', {'analytics': '>'})

# Get stream information
stream_info = client.xinfo_stream('analytics')
print("Stream info:")
print(f"  Length: {stream_info['length']}")
print(f"  First entry: {stream_info['first-entry']}")
print(f"  Last entry: {stream_info['last-entry']}")

# Get consumer group information
groups_info = client.xinfo_groups('analytics')
print("\nConsumer groups:")
for group in groups_info:
    print(f"  Group: {group['name'].decode()}")
    print(f"  Consumers: {group['consumers']}")
    print(f"  Pending: {group['pending']}")

# Get consumer information
consumers_info = client.xinfo_consumers('analytics', 'processors')
print("\nConsumers in group:")
for consumer in consumers_info:
    print(f"  Consumer: {consumer['name'].decode()}")
    print(f"  Pending: {consumer['pending']}")
    print(f"  Idle: {consumer['idle']}")

Pattern: Event Sourcing

import fakeredis
import json
import time
from datetime import datetime

class EventStore:
    def __init__(self, client):
        self.client = client
        
    def append_event(self, aggregate_id, event_type, event_data):
        """Append an event to an aggregate's stream"""
        stream_name = f"aggregate:{aggregate_id}"
        event = {
            'event_type': event_type,
            'event_data': json.dumps(event_data),
            'timestamp': datetime.utcnow().isoformat(),
            'version': str(int(time.time() * 1000000))  # Microsecond precision
        }
        return self.client.xadd(stream_name, event)
    
    def get_events(self, aggregate_id, from_version=None):
        """Retrieve all events for an aggregate"""
        stream_name = f"aggregate:{aggregate_id}"
        min_id = from_version if from_version else '-'
        
        events = []
        entries = self.client.xrange(stream_name, min=min_id)
        
        for entry_id, fields in entries:
            event = {
                'id': entry_id.decode(),
                'event_type': fields[b'event_type'].decode(),
                'event_data': json.loads(fields[b'event_data'].decode()),
                'timestamp': fields[b'timestamp'].decode(),
                'version': fields[b'version'].decode()
            }
            events.append(event)
        
        return events

# Usage example
client = fakeredis.FakeRedis()
event_store = EventStore(client)

# Append events for a user aggregate
user_id = "user123"
event_store.append_event(user_id, "UserCreated", {"name": "Alice", "email": "alice@example.com"})
event_store.append_event(user_id, "EmailChanged", {"old_email": "alice@example.com", "new_email": "alice.smith@example.com"})
event_store.append_event(user_id, "ProfileUpdated", {"field": "age", "value": 30})

# Retrieve event history
events = event_store.get_events(user_id)
print(f"Events for {user_id}:")
for event in events:
    print(f"  {event['event_type']}: {event['event_data']}")

Pattern: Message Queue with Dead Letter

import fakeredis
import time
import json

class StreamMessageQueue:
    def __init__(self, client, queue_name, consumer_group):
        self.client = client
        self.queue_name = queue_name
        self.consumer_group = consumer_group
        self.dead_letter_queue = f"{queue_name}:dlq"
        
        # Create consumer group if it doesn't exist
        try:
            self.client.xgroup_create(queue_name, consumer_group, id='0', mkstream=True)
        except:
            pass  # Group already exists
            
    def enqueue(self, message_data, priority=0):
        """Add a message to the queue"""
        message = {
            'data': json.dumps(message_data),
            'priority': str(priority),
            'enqueued_at': str(int(time.time())),
            'retry_count': '0'
        }
        return self.client.xadd(self.queue_name, message)
    
    def dequeue(self, consumer_name, count=1, block_ms=1000):
        """Dequeue messages for processing"""
        messages = self.client.xreadgroup(
            self.consumer_group, 
            consumer_name,
            {self.queue_name: '>'},
            count=count,
            block=block_ms
        )
        
        processed_messages = []
        for stream_name, entries in messages:
            for entry_id, fields in entries:
                message = {
                    'id': entry_id.decode(),
                    'data': json.loads(fields[b'data'].decode()),
                    'priority': int(fields[b'priority'].decode()),
                    'enqueued_at': int(fields[b'enqueued_at'].decode()),
                    'retry_count': int(fields[b'retry_count'].decode())
                }
                processed_messages.append(message)
                
        return processed_messages
    
    def acknowledge(self, message_id):
        """Acknowledge successful processing"""
        return self.client.xack(self.queue_name, self.consumer_group, message_id)
    
    def retry_failed_messages(self, max_retries=3, idle_time_ms=60000):
        """Move failed messages to retry or dead letter queue"""
        # Get pending messages that are idle
        pending = self.client.xpending(
            self.queue_name, 
            self.consumer_group,
            min='-',
            max='+',
            count=100
        )
        
        current_time = int(time.time() * 1000)
        
        for msg_info in pending:
            if isinstance(msg_info, dict):
                msg_id = msg_info['message_id'].decode()
                idle = msg_info['time_since_delivered']
                consumer = msg_info['consumer'].decode()
                
                if idle > idle_time_ms:  # Message is idle too long
                    # Claim the message
                    claimed = self.client.xclaim(
                        self.queue_name,
                        self.consumer_group,
                        'retry_handler',
                        idle_time_ms,
                        [msg_id]
                    )
                    
                    if claimed:
                        entry_id, fields = claimed[0]
                        retry_count = int(fields[b'retry_count'].decode())
                        
                        if retry_count >= max_retries:
                            # Move to dead letter queue
                            self.client.xadd(self.dead_letter_queue, {
                                'original_id': msg_id,
                                'data': fields[b'data'].decode(),
                                'failed_at': str(current_time),
                                'retry_count': str(retry_count)
                            })
                            self.acknowledge(msg_id)
                        else:
                            # Increment retry count and re-queue
                            fields[b'retry_count'] = str(retry_count + 1).encode()
                            self.client.xadd(self.queue_name, {
                                k.decode(): v.decode() for k, v in fields.items()
                            })
                            self.acknowledge(msg_id)

# Usage example
client = fakeredis.FakeRedis()
queue = StreamMessageQueue(client, 'work_queue', 'workers')

# Enqueue some work
queue.enqueue({'task': 'send_email', 'recipient': 'user@example.com'})
queue.enqueue({'task': 'process_payment', 'amount': 100}, priority=1)

# Process messages
messages = queue.dequeue('worker1')
for message in messages:
    try:
        # Simulate processing
        print(f"Processing: {message['data']}")
        # Acknowledge on success
        queue.acknowledge(message['id'])
    except Exception as e:
        print(f"Failed to process {message['id']}: {e}")
        # Don't acknowledge - will be retried later

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json