docs
0
# Stream Operations
1
2
Redis 5.0+ streams for append-only log data structures with consumer group support for distributed processing. Streams provide a powerful abstraction for event sourcing, message queuing, and real-time data processing with built-in persistence and horizontal scaling capabilities.
3
4
## Capabilities
5
6
### Stream Management
7
8
Core operations for creating, adding to, and querying Redis streams.
9
10
```python { .api }
11
def xadd(
12
self,
13
name: KeyT,
14
fields: Dict[AnyKeyT, EncodableT],
15
id: str = "*",
16
maxlen: Optional[int] = None,
17
approximate: bool = True,
18
nomkstream: bool = False,
19
minid: Optional[str] = None,
20
limit: Optional[int] = None
21
) -> str: ...
22
23
def xlen(self, name: KeyT) -> int: ...
24
25
def xrange(
26
self,
27
name: KeyT,
28
min: str = "-",
29
max: str = "+",
30
count: Optional[int] = None
31
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...
32
33
def xrevrange(
34
self,
35
name: KeyT,
36
max: str = "+",
37
min: str = "-",
38
count: Optional[int] = None
39
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...
40
41
def xdel(self, name: KeyT, *ids: str) -> int: ...
42
43
def xtrim(
44
self,
45
name: KeyT,
46
maxlen: Optional[int] = None,
47
approximate: bool = True,
48
minid: Optional[str] = None,
49
limit: Optional[int] = None
50
) -> int: ...
51
```
52
53
### Stream Reading
54
55
Reading operations for consuming stream entries with blocking and non-blocking modes.
56
57
```python { .api }
58
def xread(
59
self,
60
streams: Dict[KeyT, Union[str, int]],
61
count: Optional[int] = None,
62
block: Optional[int] = None
63
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...
64
65
def xreadgroup(
66
self,
67
groupname: str,
68
consumername: str,
69
streams: Dict[KeyT, Union[str, int]],
70
count: Optional[int] = None,
71
block: Optional[int] = None,
72
noack: bool = False
73
) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...
74
```
75
76
### Consumer Groups
77
78
Operations for managing consumer groups and tracking message processing.
79
80
```python { .api }
81
def xgroup_create(
82
self,
83
name: KeyT,
84
groupname: str,
85
id: str = "$",
86
mkstream: bool = False,
87
entries_read: Optional[int] = None
88
) -> bool: ...
89
90
def xgroup_destroy(self, name: KeyT, groupname: str) -> bool: ...
91
92
def xgroup_createconsumer(
93
self,
94
name: KeyT,
95
groupname: str,
96
consumername: str
97
) -> bool: ...
98
99
def xgroup_delconsumer(
100
self,
101
name: KeyT,
102
groupname: str,
103
consumername: str
104
) -> int: ...
105
106
def xgroup_setid(
107
self,
108
name: KeyT,
109
groupname: str,
110
id: str,
111
entries_read: Optional[int] = None
112
) -> bool: ...
113
```
114
115
### Message Acknowledgment
116
117
Functions for acknowledging processed messages and managing pending entries.
118
119
```python { .api }
120
def xack(self, name: KeyT, groupname: str, *ids: str) -> int: ...
121
122
def xpending(
123
self,
124
name: KeyT,
125
groupname: str,
126
min: Optional[str] = None,
127
max: Optional[str] = None,
128
count: Optional[int] = None,
129
consumername: Optional[str] = None
130
) -> Union[Dict[str, Any], List[Dict[str, Any]]]: ...
131
132
def xclaim(
133
self,
134
name: KeyT,
135
groupname: str,
136
consumername: str,
137
min_idle_time: int,
138
message_ids: List[str],
139
idle: Optional[int] = None,
140
time: Optional[int] = None,
141
retrycount: Optional[int] = None,
142
force: bool = False,
143
justid: bool = False
144
) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...
145
146
def xautoclaim(
147
self,
148
name: KeyT,
149
groupname: str,
150
consumername: str,
151
min_idle_time: int,
152
start_id: str = "0-0",
153
count: Optional[int] = None,
154
justid: bool = False
155
) -> Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]], List[bytes]]: ...
156
```
157
158
### Stream Information
159
160
Inspection commands for stream metadata, consumer group status, and consumer information.
161
162
```python { .api }
163
def xinfo_consumers(self, name: KeyT, groupname: str) -> List[Dict[str, Any]]: ...
164
165
def xinfo_groups(self, name: KeyT) -> List[Dict[str, Any]]: ...
166
167
def xinfo_stream(self, name: KeyT, full: bool = False, count: Optional[int] = None) -> Dict[str, Any]: ...
168
```
169
170
## Usage Examples
171
172
### Basic Stream Operations
173
174
```python
175
import fakeredis
176
177
client = fakeredis.FakeRedis()
178
179
# Add entries to a stream
180
entry_id1 = client.xadd('events', {'user': 'alice', 'action': 'login'})
181
print(f"Added entry: {entry_id1}")
182
183
entry_id2 = client.xadd('events', {'user': 'bob', 'action': 'purchase', 'amount': '29.99'})
184
print(f"Added entry: {entry_id2}")
185
186
# Add with custom ID
187
custom_id = client.xadd('events', {'user': 'charlie', 'action': 'logout'}, id='1234567890123-0')
188
189
# Get stream length
190
length = client.xlen('events')
191
print(f"Stream length: {length}")
192
```
193
194
### Reading Stream Data
195
196
```python
197
import fakeredis
198
199
client = fakeredis.FakeRedis()
200
201
# Add some test data
202
client.xadd('sensor_data', {'temperature': '23.5', 'humidity': '65'})
203
client.xadd('sensor_data', {'temperature': '24.1', 'humidity': '62'})
204
client.xadd('sensor_data', {'temperature': '23.8', 'humidity': '68'})
205
206
# Read all entries
207
entries = client.xrange('sensor_data')
208
for entry_id, fields in entries:
209
print(f"ID: {entry_id.decode()}")
210
for key, value in fields.items():
211
print(f" {key.decode()}: {value.decode()}")
212
213
# Read entries in reverse order
214
recent_entries = client.xrevrange('sensor_data', count=2)
215
print(f"Last 2 entries: {len(recent_entries)}")
216
217
# Read from a specific ID
218
from_id = entries[0][0].decode() # First entry ID
219
new_entries = client.xrange('sensor_data', min=from_id)
220
```
221
222
### Stream Trimming and Cleanup
223
224
```python
225
import fakeredis
226
227
client = fakeredis.FakeRedis()
228
229
# Add many entries
230
for i in range(100):
231
client.xadd('logs', {'level': 'info', 'message': f'Log entry {i}'})
232
233
print(f"Stream length before trim: {client.xlen('logs')}")
234
235
# Keep only the latest 50 entries (approximate)
236
trimmed = client.xtrim('logs', maxlen=50, approximate=True)
237
print(f"Trimmed {trimmed} entries")
238
print(f"Stream length after trim: {client.xlen('logs')}")
239
240
# Delete specific entries
241
entries = client.xrange('logs', count=5)
242
entry_ids = [entry[0].decode() for entry in entries]
243
deleted = client.xdel('logs', *entry_ids)
244
print(f"Deleted {deleted} specific entries")
245
```
246
247
### Consumer Groups
248
249
```python
250
import fakeredis
251
import time
252
253
client = fakeredis.FakeRedis()
254
255
# Create a stream and add some data
256
for i in range(10):
257
client.xadd('orders', {'order_id': f'order_{i}', 'status': 'pending'})
258
259
# Create consumer group
260
client.xgroup_create('orders', 'processors', id='0')
261
262
# Create consumers in the group
263
client.xgroup_createconsumer('orders', 'processors', 'worker1')
264
client.xgroup_createconsumer('orders', 'processors', 'worker2')
265
266
# Consumer 1 reads messages
267
messages = client.xreadgroup('processors', 'worker1', {'orders': '>'}, count=3)
268
print("Worker1 received:")
269
for stream_name, entries in messages:
270
for entry_id, fields in entries:
271
print(f" {entry_id.decode()}: {fields}")
272
273
# Consumer 2 reads different messages
274
messages = client.xreadgroup('processors', 'worker2', {'orders': '>'}, count=2)
275
print("Worker2 received:")
276
for stream_name, entries in messages:
277
for entry_id, fields in entries:
278
print(f" {entry_id.decode()}: {fields}")
279
```
280
281
### Message Acknowledgment and Pending Entries
282
283
```python
284
import fakeredis
285
286
client = fakeredis.FakeRedis()
287
288
# Setup stream and consumer group
289
client.xadd('tasks', {'task': 'send_email', 'recipient': 'user@example.com'})
290
client.xadd('tasks', {'task': 'process_payment', 'amount': '100.00'})
291
client.xadd('tasks', {'task': 'update_inventory', 'item_id': '12345'})
292
293
client.xgroup_create('tasks', 'workers', id='0')
294
295
# Read messages without acknowledging
296
messages = client.xreadgroup('workers', 'consumer1', {'tasks': '>'})
297
entry_ids = []
298
for stream_name, entries in messages:
299
for entry_id, fields in entries:
300
entry_ids.append(entry_id.decode())
301
print(f"Processing: {fields}")
302
303
# Check pending messages
304
pending_info = client.xpending('tasks', 'workers')
305
print(f"Pending messages: {pending_info}")
306
307
# Acknowledge processed messages
308
acked = client.xack('tasks', 'workers', *entry_ids[:2]) # Ack first 2 messages
309
print(f"Acknowledged {acked} messages")
310
311
# Check pending again
312
pending_info = client.xpending('tasks', 'workers')
313
print(f"Remaining pending: {pending_info}")
314
```
315
316
### Blocking Stream Reads
317
318
```python
319
import fakeredis
320
import threading
321
import time
322
323
client = fakeredis.FakeRedis()
324
325
def producer():
326
"""Producer thread that adds entries every 2 seconds"""
327
for i in range(5):
328
time.sleep(2)
329
entry_id = client.xadd('notifications', {
330
'type': 'alert',
331
'message': f'Alert {i}',
332
'timestamp': str(int(time.time()))
333
})
334
print(f"Producer added: {entry_id}")
335
336
def consumer():
337
"""Consumer that blocks waiting for new entries"""
338
last_id = '0-0'
339
while True:
340
# Block for up to 5 seconds waiting for new messages
341
messages = client.xread({'notifications': last_id}, block=5000)
342
if not messages:
343
print("No new messages, continuing...")
344
break
345
346
for stream_name, entries in messages:
347
for entry_id, fields in entries:
348
print(f"Consumer received: {entry_id.decode()} - {fields}")
349
last_id = entry_id.decode()
350
351
# Start producer thread
352
producer_thread = threading.Thread(target=producer)
353
producer_thread.start()
354
355
# Start consuming (will block)
356
consumer()
357
358
producer_thread.join()
359
```
360
361
### Stream Information and Monitoring
362
363
```python
364
import fakeredis
365
366
client = fakeredis.FakeRedis()
367
368
# Setup test data
369
client.xadd('analytics', {'event': 'page_view', 'page': '/home'})
370
client.xadd('analytics', {'event': 'click', 'element': 'button'})
371
client.xgroup_create('analytics', 'processors', id='0')
372
client.xreadgroup('processors', 'worker1', {'analytics': '>'})
373
374
# Get stream information
375
stream_info = client.xinfo_stream('analytics')
376
print("Stream info:")
377
print(f" Length: {stream_info['length']}")
378
print(f" First entry: {stream_info['first-entry']}")
379
print(f" Last entry: {stream_info['last-entry']}")
380
381
# Get consumer group information
382
groups_info = client.xinfo_groups('analytics')
383
print("\nConsumer groups:")
384
for group in groups_info:
385
print(f" Group: {group['name'].decode()}")
386
print(f" Consumers: {group['consumers']}")
387
print(f" Pending: {group['pending']}")
388
389
# Get consumer information
390
consumers_info = client.xinfo_consumers('analytics', 'processors')
391
print("\nConsumers in group:")
392
for consumer in consumers_info:
393
print(f" Consumer: {consumer['name'].decode()}")
394
print(f" Pending: {consumer['pending']}")
395
print(f" Idle: {consumer['idle']}")
396
```
397
398
### Pattern: Event Sourcing
399
400
```python
401
import fakeredis
402
import json
403
import time
404
from datetime import datetime
405
406
class EventStore:
407
def __init__(self, client):
408
self.client = client
409
410
def append_event(self, aggregate_id, event_type, event_data):
411
"""Append an event to an aggregate's stream"""
412
stream_name = f"aggregate:{aggregate_id}"
413
event = {
414
'event_type': event_type,
415
'event_data': json.dumps(event_data),
416
'timestamp': datetime.utcnow().isoformat(),
417
'version': str(int(time.time() * 1000000)) # Microsecond precision
418
}
419
return self.client.xadd(stream_name, event)
420
421
def get_events(self, aggregate_id, from_version=None):
422
"""Retrieve all events for an aggregate"""
423
stream_name = f"aggregate:{aggregate_id}"
424
min_id = from_version if from_version else '-'
425
426
events = []
427
entries = self.client.xrange(stream_name, min=min_id)
428
429
for entry_id, fields in entries:
430
event = {
431
'id': entry_id.decode(),
432
'event_type': fields[b'event_type'].decode(),
433
'event_data': json.loads(fields[b'event_data'].decode()),
434
'timestamp': fields[b'timestamp'].decode(),
435
'version': fields[b'version'].decode()
436
}
437
events.append(event)
438
439
return events
440
441
# Usage example
442
client = fakeredis.FakeRedis()
443
event_store = EventStore(client)
444
445
# Append events for a user aggregate
446
user_id = "user123"
447
event_store.append_event(user_id, "UserCreated", {"name": "Alice", "email": "alice@example.com"})
448
event_store.append_event(user_id, "EmailChanged", {"old_email": "alice@example.com", "new_email": "alice.smith@example.com"})
449
event_store.append_event(user_id, "ProfileUpdated", {"field": "age", "value": 30})
450
451
# Retrieve event history
452
events = event_store.get_events(user_id)
453
print(f"Events for {user_id}:")
454
for event in events:
455
print(f" {event['event_type']}: {event['event_data']}")
456
```
457
458
### Pattern: Message Queue with Dead Letter
459
460
```python
461
import fakeredis
462
import time
463
import json
464
465
class StreamMessageQueue:
466
def __init__(self, client, queue_name, consumer_group):
467
self.client = client
468
self.queue_name = queue_name
469
self.consumer_group = consumer_group
470
self.dead_letter_queue = f"{queue_name}:dlq"
471
472
# Create consumer group if it doesn't exist
473
try:
474
self.client.xgroup_create(queue_name, consumer_group, id='0', mkstream=True)
475
except:
476
pass # Group already exists
477
478
def enqueue(self, message_data, priority=0):
479
"""Add a message to the queue"""
480
message = {
481
'data': json.dumps(message_data),
482
'priority': str(priority),
483
'enqueued_at': str(int(time.time())),
484
'retry_count': '0'
485
}
486
return self.client.xadd(self.queue_name, message)
487
488
def dequeue(self, consumer_name, count=1, block_ms=1000):
489
"""Dequeue messages for processing"""
490
messages = self.client.xreadgroup(
491
self.consumer_group,
492
consumer_name,
493
{self.queue_name: '>'},
494
count=count,
495
block=block_ms
496
)
497
498
processed_messages = []
499
for stream_name, entries in messages:
500
for entry_id, fields in entries:
501
message = {
502
'id': entry_id.decode(),
503
'data': json.loads(fields[b'data'].decode()),
504
'priority': int(fields[b'priority'].decode()),
505
'enqueued_at': int(fields[b'enqueued_at'].decode()),
506
'retry_count': int(fields[b'retry_count'].decode())
507
}
508
processed_messages.append(message)
509
510
return processed_messages
511
512
def acknowledge(self, message_id):
513
"""Acknowledge successful processing"""
514
return self.client.xack(self.queue_name, self.consumer_group, message_id)
515
516
def retry_failed_messages(self, max_retries=3, idle_time_ms=60000):
517
"""Move failed messages to retry or dead letter queue"""
518
# Get pending messages that are idle
519
pending = self.client.xpending(
520
self.queue_name,
521
self.consumer_group,
522
min='-',
523
max='+',
524
count=100
525
)
526
527
current_time = int(time.time() * 1000)
528
529
for msg_info in pending:
530
if isinstance(msg_info, dict):
531
msg_id = msg_info['message_id'].decode()
532
idle = msg_info['time_since_delivered']
533
consumer = msg_info['consumer'].decode()
534
535
if idle > idle_time_ms: # Message is idle too long
536
# Claim the message
537
claimed = self.client.xclaim(
538
self.queue_name,
539
self.consumer_group,
540
'retry_handler',
541
idle_time_ms,
542
[msg_id]
543
)
544
545
if claimed:
546
entry_id, fields = claimed[0]
547
retry_count = int(fields[b'retry_count'].decode())
548
549
if retry_count >= max_retries:
550
# Move to dead letter queue
551
self.client.xadd(self.dead_letter_queue, {
552
'original_id': msg_id,
553
'data': fields[b'data'].decode(),
554
'failed_at': str(current_time),
555
'retry_count': str(retry_count)
556
})
557
self.acknowledge(msg_id)
558
else:
559
# Increment retry count and re-queue
560
fields[b'retry_count'] = str(retry_count + 1).encode()
561
self.client.xadd(self.queue_name, {
562
k.decode(): v.decode() for k, v in fields.items()
563
})
564
self.acknowledge(msg_id)
565
566
# Usage example
567
client = fakeredis.FakeRedis()
568
queue = StreamMessageQueue(client, 'work_queue', 'workers')
569
570
# Enqueue some work
571
queue.enqueue({'task': 'send_email', 'recipient': 'user@example.com'})
572
queue.enqueue({'task': 'process_payment', 'amount': 100}, priority=1)
573
574
# Process messages
575
messages = queue.dequeue('worker1')
576
for message in messages:
577
try:
578
# Simulate processing
579
print(f"Processing: {message['data']}")
580
# Acknowledge on success
581
queue.acknowledge(message['id'])
582
except Exception as e:
583
print(f"Failed to process {message['id']}: {e}")
584
# Don't acknowledge - will be retried later
585
```