or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bitmap-operations.mdcore-clients.mdgeneric-operations.mdgeospatial-operations.mdhash-operations.mdindex.mdlist-operations.mdlua-scripting.mdpubsub-operations.mdserver-management.mdserver-operations.mdset-operations.mdsorted-set-operations.mdstack-extensions.mdstream-operations.mdstring-operations.mdtransaction-operations.mdvalkey-support.md

stream-operations.mddocs/

0

# Stream Operations

1

2

Redis 5.0+ streams for append-only log data structures with consumer group support for distributed processing. Streams provide a powerful abstraction for event sourcing, message queuing, and real-time data processing with built-in persistence and horizontal scaling capabilities.

3

4

## Capabilities

5

6

### Stream Management

7

8

Core operations for creating, adding to, and querying Redis streams.

9

10

```python { .api }

11

def xadd(

12

self,

13

name: KeyT,

14

fields: Dict[AnyKeyT, EncodableT],

15

id: str = "*",

16

maxlen: Optional[int] = None,

17

approximate: bool = True,

18

nomkstream: bool = False,

19

minid: Optional[str] = None,

20

limit: Optional[int] = None

21

) -> str: ...

22

23

def xlen(self, name: KeyT) -> int: ...

24

25

def xrange(

26

self,

27

name: KeyT,

28

min: str = "-",

29

max: str = "+",

30

count: Optional[int] = None

31

) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...

32

33

def xrevrange(

34

self,

35

name: KeyT,

36

max: str = "+",

37

min: str = "-",

38

count: Optional[int] = None

39

) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...

40

41

def xdel(self, name: KeyT, *ids: str) -> int: ...

42

43

def xtrim(

44

self,

45

name: KeyT,

46

maxlen: Optional[int] = None,

47

approximate: bool = True,

48

minid: Optional[str] = None,

49

limit: Optional[int] = None

50

) -> int: ...

51

```

52

53

### Stream Reading

54

55

Reading operations for consuming stream entries with blocking and non-blocking modes.

56

57

```python { .api }

58

def xread(

59

self,

60

streams: Dict[KeyT, Union[str, int]],

61

count: Optional[int] = None,

62

block: Optional[int] = None

63

) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...

64

65

def xreadgroup(

66

self,

67

groupname: str,

68

consumername: str,

69

streams: Dict[KeyT, Union[str, int]],

70

count: Optional[int] = None,

71

block: Optional[int] = None,

72

noack: bool = False

73

) -> List[Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]: ...

74

```

75

76

### Consumer Groups

77

78

Operations for managing consumer groups and tracking message processing.

79

80

```python { .api }

81

def xgroup_create(

82

self,

83

name: KeyT,

84

groupname: str,

85

id: str = "$",

86

mkstream: bool = False,

87

entries_read: Optional[int] = None

88

) -> bool: ...

89

90

def xgroup_destroy(self, name: KeyT, groupname: str) -> bool: ...

91

92

def xgroup_createconsumer(

93

self,

94

name: KeyT,

95

groupname: str,

96

consumername: str

97

) -> bool: ...

98

99

def xgroup_delconsumer(

100

self,

101

name: KeyT,

102

groupname: str,

103

consumername: str

104

) -> int: ...

105

106

def xgroup_setid(

107

self,

108

name: KeyT,

109

groupname: str,

110

id: str,

111

entries_read: Optional[int] = None

112

) -> bool: ...

113

```

114

115

### Message Acknowledgment

116

117

Functions for acknowledging processed messages and managing pending entries.

118

119

```python { .api }

120

def xack(self, name: KeyT, groupname: str, *ids: str) -> int: ...

121

122

def xpending(

123

self,

124

name: KeyT,

125

groupname: str,

126

min: Optional[str] = None,

127

max: Optional[str] = None,

128

count: Optional[int] = None,

129

consumername: Optional[str] = None

130

) -> Union[Dict[str, Any], List[Dict[str, Any]]]: ...

131

132

def xclaim(

133

self,

134

name: KeyT,

135

groupname: str,

136

consumername: str,

137

min_idle_time: int,

138

message_ids: List[str],

139

idle: Optional[int] = None,

140

time: Optional[int] = None,

141

retrycount: Optional[int] = None,

142

force: bool = False,

143

justid: bool = False

144

) -> List[Tuple[bytes, Dict[bytes, bytes]]]: ...

145

146

def xautoclaim(

147

self,

148

name: KeyT,

149

groupname: str,

150

consumername: str,

151

min_idle_time: int,

152

start_id: str = "0-0",

153

count: Optional[int] = None,

154

justid: bool = False

155

) -> Tuple[bytes, List[Tuple[bytes, Dict[bytes, bytes]]], List[bytes]]: ...

156

```

157

158

### Stream Information

159

160

Inspection commands for stream metadata, consumer group status, and consumer information.

161

162

```python { .api }

163

def xinfo_consumers(self, name: KeyT, groupname: str) -> List[Dict[str, Any]]: ...

164

165

def xinfo_groups(self, name: KeyT) -> List[Dict[str, Any]]: ...

166

167

def xinfo_stream(self, name: KeyT, full: bool = False, count: Optional[int] = None) -> Dict[str, Any]: ...

168

```

169

170

## Usage Examples

171

172

### Basic Stream Operations

173

174

```python

175

import fakeredis

176

177

client = fakeredis.FakeRedis()

178

179

# Add entries to a stream

180

entry_id1 = client.xadd('events', {'user': 'alice', 'action': 'login'})

181

print(f"Added entry: {entry_id1}")

182

183

entry_id2 = client.xadd('events', {'user': 'bob', 'action': 'purchase', 'amount': '29.99'})

184

print(f"Added entry: {entry_id2}")

185

186

# Add with custom ID

187

custom_id = client.xadd('events', {'user': 'charlie', 'action': 'logout'}, id='1234567890123-0')

188

189

# Get stream length

190

length = client.xlen('events')

191

print(f"Stream length: {length}")

192

```

193

194

### Reading Stream Data

195

196

```python

197

import fakeredis

198

199

client = fakeredis.FakeRedis()

200

201

# Add some test data

202

client.xadd('sensor_data', {'temperature': '23.5', 'humidity': '65'})

203

client.xadd('sensor_data', {'temperature': '24.1', 'humidity': '62'})

204

client.xadd('sensor_data', {'temperature': '23.8', 'humidity': '68'})

205

206

# Read all entries

207

entries = client.xrange('sensor_data')

208

for entry_id, fields in entries:

209

print(f"ID: {entry_id.decode()}")

210

for key, value in fields.items():

211

print(f" {key.decode()}: {value.decode()}")

212

213

# Read entries in reverse order

214

recent_entries = client.xrevrange('sensor_data', count=2)

215

print(f"Last 2 entries: {len(recent_entries)}")

216

217

# Read from a specific ID

218

from_id = entries[0][0].decode() # First entry ID

219

new_entries = client.xrange('sensor_data', min=from_id)

220

```

221

222

### Stream Trimming and Cleanup

223

224

```python

225

import fakeredis

226

227

client = fakeredis.FakeRedis()

228

229

# Add many entries

230

for i in range(100):

231

client.xadd('logs', {'level': 'info', 'message': f'Log entry {i}'})

232

233

print(f"Stream length before trim: {client.xlen('logs')}")

234

235

# Keep only the latest 50 entries (approximate)

236

trimmed = client.xtrim('logs', maxlen=50, approximate=True)

237

print(f"Trimmed {trimmed} entries")

238

print(f"Stream length after trim: {client.xlen('logs')}")

239

240

# Delete specific entries

241

entries = client.xrange('logs', count=5)

242

entry_ids = [entry[0].decode() for entry in entries]

243

deleted = client.xdel('logs', *entry_ids)

244

print(f"Deleted {deleted} specific entries")

245

```

246

247

### Consumer Groups

248

249

```python

250

import fakeredis

251

import time

252

253

client = fakeredis.FakeRedis()

254

255

# Create a stream and add some data

256

for i in range(10):

257

client.xadd('orders', {'order_id': f'order_{i}', 'status': 'pending'})

258

259

# Create consumer group

260

client.xgroup_create('orders', 'processors', id='0')

261

262

# Create consumers in the group

263

client.xgroup_createconsumer('orders', 'processors', 'worker1')

264

client.xgroup_createconsumer('orders', 'processors', 'worker2')

265

266

# Consumer 1 reads messages

267

messages = client.xreadgroup('processors', 'worker1', {'orders': '>'}, count=3)

268

print("Worker1 received:")

269

for stream_name, entries in messages:

270

for entry_id, fields in entries:

271

print(f" {entry_id.decode()}: {fields}")

272

273

# Consumer 2 reads different messages

274

messages = client.xreadgroup('processors', 'worker2', {'orders': '>'}, count=2)

275

print("Worker2 received:")

276

for stream_name, entries in messages:

277

for entry_id, fields in entries:

278

print(f" {entry_id.decode()}: {fields}")

279

```

280

281

### Message Acknowledgment and Pending Entries

282

283

```python

284

import fakeredis

285

286

client = fakeredis.FakeRedis()

287

288

# Setup stream and consumer group

289

client.xadd('tasks', {'task': 'send_email', 'recipient': 'user@example.com'})

290

client.xadd('tasks', {'task': 'process_payment', 'amount': '100.00'})

291

client.xadd('tasks', {'task': 'update_inventory', 'item_id': '12345'})

292

293

client.xgroup_create('tasks', 'workers', id='0')

294

295

# Read messages without acknowledging

296

messages = client.xreadgroup('workers', 'consumer1', {'tasks': '>'})

297

entry_ids = []

298

for stream_name, entries in messages:

299

for entry_id, fields in entries:

300

entry_ids.append(entry_id.decode())

301

print(f"Processing: {fields}")

302

303

# Check pending messages

304

pending_info = client.xpending('tasks', 'workers')

305

print(f"Pending messages: {pending_info}")

306

307

# Acknowledge processed messages

308

acked = client.xack('tasks', 'workers', *entry_ids[:2]) # Ack first 2 messages

309

print(f"Acknowledged {acked} messages")

310

311

# Check pending again

312

pending_info = client.xpending('tasks', 'workers')

313

print(f"Remaining pending: {pending_info}")

314

```

315

316

### Blocking Stream Reads

317

318

```python

319

import fakeredis

320

import threading

321

import time

322

323

client = fakeredis.FakeRedis()

324

325

def producer():

326

"""Producer thread that adds entries every 2 seconds"""

327

for i in range(5):

328

time.sleep(2)

329

entry_id = client.xadd('notifications', {

330

'type': 'alert',

331

'message': f'Alert {i}',

332

'timestamp': str(int(time.time()))

333

})

334

print(f"Producer added: {entry_id}")

335

336

def consumer():

337

"""Consumer that blocks waiting for new entries"""

338

last_id = '0-0'

339

while True:

340

# Block for up to 5 seconds waiting for new messages

341

messages = client.xread({'notifications': last_id}, block=5000)

342

if not messages:

343

print("No new messages, continuing...")

344

break

345

346

for stream_name, entries in messages:

347

for entry_id, fields in entries:

348

print(f"Consumer received: {entry_id.decode()} - {fields}")

349

last_id = entry_id.decode()

350

351

# Start producer thread

352

producer_thread = threading.Thread(target=producer)

353

producer_thread.start()

354

355

# Start consuming (will block)

356

consumer()

357

358

producer_thread.join()

359

```

360

361

### Stream Information and Monitoring

362

363

```python

364

import fakeredis

365

366

client = fakeredis.FakeRedis()

367

368

# Setup test data

369

client.xadd('analytics', {'event': 'page_view', 'page': '/home'})

370

client.xadd('analytics', {'event': 'click', 'element': 'button'})

371

client.xgroup_create('analytics', 'processors', id='0')

372

client.xreadgroup('processors', 'worker1', {'analytics': '>'})

373

374

# Get stream information

375

stream_info = client.xinfo_stream('analytics')

376

print("Stream info:")

377

print(f" Length: {stream_info['length']}")

378

print(f" First entry: {stream_info['first-entry']}")

379

print(f" Last entry: {stream_info['last-entry']}")

380

381

# Get consumer group information

382

groups_info = client.xinfo_groups('analytics')

383

print("\nConsumer groups:")

384

for group in groups_info:

385

print(f" Group: {group['name'].decode()}")

386

print(f" Consumers: {group['consumers']}")

387

print(f" Pending: {group['pending']}")

388

389

# Get consumer information

390

consumers_info = client.xinfo_consumers('analytics', 'processors')

391

print("\nConsumers in group:")

392

for consumer in consumers_info:

393

print(f" Consumer: {consumer['name'].decode()}")

394

print(f" Pending: {consumer['pending']}")

395

print(f" Idle: {consumer['idle']}")

396

```

397

398

### Pattern: Event Sourcing

399

400

```python

401

import fakeredis

402

import json

403

import time

404

from datetime import datetime

405

406

class EventStore:

407

def __init__(self, client):

408

self.client = client

409

410

def append_event(self, aggregate_id, event_type, event_data):

411

"""Append an event to an aggregate's stream"""

412

stream_name = f"aggregate:{aggregate_id}"

413

event = {

414

'event_type': event_type,

415

'event_data': json.dumps(event_data),

416

'timestamp': datetime.utcnow().isoformat(),

417

'version': str(int(time.time() * 1000000)) # Microsecond precision

418

}

419

return self.client.xadd(stream_name, event)

420

421

def get_events(self, aggregate_id, from_version=None):

422

"""Retrieve all events for an aggregate"""

423

stream_name = f"aggregate:{aggregate_id}"

424

min_id = from_version if from_version else '-'

425

426

events = []

427

entries = self.client.xrange(stream_name, min=min_id)

428

429

for entry_id, fields in entries:

430

event = {

431

'id': entry_id.decode(),

432

'event_type': fields[b'event_type'].decode(),

433

'event_data': json.loads(fields[b'event_data'].decode()),

434

'timestamp': fields[b'timestamp'].decode(),

435

'version': fields[b'version'].decode()

436

}

437

events.append(event)

438

439

return events

440

441

# Usage example

442

client = fakeredis.FakeRedis()

443

event_store = EventStore(client)

444

445

# Append events for a user aggregate

446

user_id = "user123"

447

event_store.append_event(user_id, "UserCreated", {"name": "Alice", "email": "alice@example.com"})

448

event_store.append_event(user_id, "EmailChanged", {"old_email": "alice@example.com", "new_email": "alice.smith@example.com"})

449

event_store.append_event(user_id, "ProfileUpdated", {"field": "age", "value": 30})

450

451

# Retrieve event history

452

events = event_store.get_events(user_id)

453

print(f"Events for {user_id}:")

454

for event in events:

455

print(f" {event['event_type']}: {event['event_data']}")

456

```

457

458

### Pattern: Message Queue with Dead Letter

459

460

```python

461

import fakeredis

462

import time

463

import json

464

465

class StreamMessageQueue:

466

def __init__(self, client, queue_name, consumer_group):

467

self.client = client

468

self.queue_name = queue_name

469

self.consumer_group = consumer_group

470

self.dead_letter_queue = f"{queue_name}:dlq"

471

472

# Create consumer group if it doesn't exist

473

try:

474

self.client.xgroup_create(queue_name, consumer_group, id='0', mkstream=True)

475

except:

476

pass # Group already exists

477

478

def enqueue(self, message_data, priority=0):

479

"""Add a message to the queue"""

480

message = {

481

'data': json.dumps(message_data),

482

'priority': str(priority),

483

'enqueued_at': str(int(time.time())),

484

'retry_count': '0'

485

}

486

return self.client.xadd(self.queue_name, message)

487

488

def dequeue(self, consumer_name, count=1, block_ms=1000):

489

"""Dequeue messages for processing"""

490

messages = self.client.xreadgroup(

491

self.consumer_group,

492

consumer_name,

493

{self.queue_name: '>'},

494

count=count,

495

block=block_ms

496

)

497

498

processed_messages = []

499

for stream_name, entries in messages:

500

for entry_id, fields in entries:

501

message = {

502

'id': entry_id.decode(),

503

'data': json.loads(fields[b'data'].decode()),

504

'priority': int(fields[b'priority'].decode()),

505

'enqueued_at': int(fields[b'enqueued_at'].decode()),

506

'retry_count': int(fields[b'retry_count'].decode())

507

}

508

processed_messages.append(message)

509

510

return processed_messages

511

512

def acknowledge(self, message_id):

513

"""Acknowledge successful processing"""

514

return self.client.xack(self.queue_name, self.consumer_group, message_id)

515

516

def retry_failed_messages(self, max_retries=3, idle_time_ms=60000):

517

"""Move failed messages to retry or dead letter queue"""

518

# Get pending messages that are idle

519

pending = self.client.xpending(

520

self.queue_name,

521

self.consumer_group,

522

min='-',

523

max='+',

524

count=100

525

)

526

527

current_time = int(time.time() * 1000)

528

529

for msg_info in pending:

530

if isinstance(msg_info, dict):

531

msg_id = msg_info['message_id'].decode()

532

idle = msg_info['time_since_delivered']

533

consumer = msg_info['consumer'].decode()

534

535

if idle > idle_time_ms: # Message is idle too long

536

# Claim the message

537

claimed = self.client.xclaim(

538

self.queue_name,

539

self.consumer_group,

540

'retry_handler',

541

idle_time_ms,

542

[msg_id]

543

)

544

545

if claimed:

546

entry_id, fields = claimed[0]

547

retry_count = int(fields[b'retry_count'].decode())

548

549

if retry_count >= max_retries:

550

# Move to dead letter queue

551

self.client.xadd(self.dead_letter_queue, {

552

'original_id': msg_id,

553

'data': fields[b'data'].decode(),

554

'failed_at': str(current_time),

555

'retry_count': str(retry_count)

556

})

557

self.acknowledge(msg_id)

558

else:

559

# Increment retry count and re-queue

560

fields[b'retry_count'] = str(retry_count + 1).encode()

561

self.client.xadd(self.queue_name, {

562

k.decode(): v.decode() for k, v in fields.items()

563

})

564

self.acknowledge(msg_id)

565

566

# Usage example

567

client = fakeredis.FakeRedis()

568

queue = StreamMessageQueue(client, 'work_queue', 'workers')

569

570

# Enqueue some work

571

queue.enqueue({'task': 'send_email', 'recipient': 'user@example.com'})

572

queue.enqueue({'task': 'process_payment', 'amount': 100}, priority=1)

573

574

# Process messages

575

messages = queue.dequeue('worker1')

576

for message in messages:

577

try:

578

# Simulate processing

579

print(f"Processing: {message['data']}")

580

# Acknowledge on success

581

queue.acknowledge(message['id'])

582

except Exception as e:

583

print(f"Failed to process {message['id']}: {e}")

584

# Don't acknowledge - will be retried later

585

```