or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdbasic-operations.mdconnection-management.mddata-structures.mdindex.mdserver-admin.md

data-structures.mddocs/

0

# Data Structures

1

2

Redis data structure operations for lists, sets, hashes, sorted sets, streams, and specialized structures like HyperLogLog and geospatial indexes. These powerful collection types enable efficient modeling of complex data relationships and patterns.

3

4

## Capabilities

5

6

### Lists

7

8

Ordered collections supporting push/pop operations from both ends, indexing, and blocking operations for queue-like behavior.

9

10

```python { .api }

11

async def lpush(name: str, *values: Any) -> int:

12

"""

13

Push values to the left (head) of list.

14

15

Args:

16

name: List key name

17

values: Values to push

18

19

Returns:

20

New list length

21

"""

22

23

async def rpush(name: str, *values: Any) -> int:

24

"""

25

Push values to the right (tail) of list.

26

27

Args:

28

name: List key name

29

values: Values to push

30

31

Returns:

32

New list length

33

"""

34

35

async def lpop(name: str) -> Optional[str]:

36

"""

37

Pop value from left (head) of list.

38

39

Args:

40

name: List key name

41

42

Returns:

43

Popped value or None if list is empty

44

"""

45

46

async def rpop(name: str) -> Optional[str]:

47

"""

48

Pop value from right (tail) of list.

49

50

Args:

51

name: List key name

52

53

Returns:

54

Popped value or None if list is empty

55

"""

56

57

async def lrange(name: str, start: int, end: int) -> List[str]:

58

"""

59

Get list elements by range.

60

61

Args:

62

name: List key name

63

start: Start index (0-based)

64

end: End index (-1 for end of list)

65

66

Returns:

67

List of elements in range

68

"""

69

70

async def llen(name: str) -> int:

71

"""

72

Get list length.

73

74

Args:

75

name: List key name

76

77

Returns:

78

Number of elements in list

79

"""

80

81

async def lindex(name: str, index: int) -> Optional[str]:

82

"""

83

Get list element by index.

84

85

Args:

86

name: List key name

87

index: Element index

88

89

Returns:

90

Element value or None if index out of bounds

91

"""

92

93

async def lset(name: str, index: int, value: Any) -> bool:

94

"""

95

Set list element at index.

96

97

Args:

98

name: List key name

99

index: Element index

100

value: New value

101

102

Returns:

103

True if successful

104

"""

105

106

async def lrem(name: str, count: int, value: Any) -> int:

107

"""

108

Remove elements from list.

109

110

Args:

111

name: List key name

112

count: Number to remove (0=all, >0=from head, <0=from tail)

113

value: Value to remove

114

115

Returns:

116

Number of elements removed

117

"""

118

119

async def ltrim(name: str, start: int, end: int) -> bool:

120

"""

121

Trim list to specified range.

122

123

Args:

124

name: List key name

125

start: Start index

126

end: End index

127

128

Returns:

129

True if successful

130

"""

131

132

async def linsert(name: str, where: str, refvalue: Any, value: Any) -> int:

133

"""

134

Insert element before or after reference value.

135

136

Args:

137

name: List key name

138

where: 'BEFORE' or 'AFTER'

139

refvalue: Reference value to find

140

value: Value to insert

141

142

Returns:

143

New list length or -1 if reference not found

144

"""

145

146

async def blpop(keys: List[str], timeout: int = 0) -> Optional[Tuple[str, str]]:

147

"""

148

Blocking pop from left of lists.

149

150

Args:

151

keys: List of list key names

152

timeout: Timeout in seconds (0=forever)

153

154

Returns:

155

Tuple of (key, value) or None on timeout

156

"""

157

158

async def brpop(keys: List[str], timeout: int = 0) -> Optional[Tuple[str, str]]:

159

"""

160

Blocking pop from right of lists.

161

162

Args:

163

keys: List of list key names

164

timeout: Timeout in seconds (0=forever)

165

166

Returns:

167

Tuple of (key, value) or None on timeout

168

"""

169

```

170

171

### Sets

172

173

Unordered collections of unique elements with set operations like union, intersection, and difference.

174

175

```python { .api }

176

async def sadd(name: str, *values: Any) -> int:

177

"""

178

Add members to set.

179

180

Args:

181

name: Set key name

182

values: Values to add

183

184

Returns:

185

Number of new members added

186

"""

187

188

async def srem(name: str, *values: Any) -> int:

189

"""

190

Remove members from set.

191

192

Args:

193

name: Set key name

194

values: Values to remove

195

196

Returns:

197

Number of members removed

198

"""

199

200

async def smembers(name: str) -> Set[str]:

201

"""

202

Get all set members.

203

204

Args:

205

name: Set key name

206

207

Returns:

208

Set of all members

209

"""

210

211

async def scard(name: str) -> int:

212

"""

213

Get set cardinality (size).

214

215

Args:

216

name: Set key name

217

218

Returns:

219

Number of elements in set

220

"""

221

222

async def sismember(name: str, value: Any) -> bool:

223

"""

224

Test if value is set member.

225

226

Args:

227

name: Set key name

228

value: Value to test

229

230

Returns:

231

True if value is in set

232

"""

233

234

async def spop(name: str, count: Optional[int] = None) -> Union[Optional[str], List[str]]:

235

"""

236

Remove and return random set members.

237

238

Args:

239

name: Set key name

240

count: Number of members to pop

241

242

Returns:

243

Single member if count is None, list of members otherwise

244

"""

245

246

async def srandmember(name: str, number: Optional[int] = None) -> Union[Optional[str], List[str]]:

247

"""

248

Get random set members without removing.

249

250

Args:

251

name: Set key name

252

number: Number of members to return

253

254

Returns:

255

Single member if number is None, list of members otherwise

256

"""

257

258

async def sinter(keys: List[str]) -> Set[str]:

259

"""

260

Intersect multiple sets.

261

262

Args:

263

keys: List of set key names

264

265

Returns:

266

Set intersection

267

"""

268

269

async def sunion(keys: List[str]) -> Set[str]:

270

"""

271

Union multiple sets.

272

273

Args:

274

keys: List of set key names

275

276

Returns:

277

Set union

278

"""

279

280

async def sdiff(keys: List[str]) -> Set[str]:

281

"""

282

Difference of multiple sets.

283

284

Args:

285

keys: List of set key names

286

287

Returns:

288

Set difference

289

"""

290

291

async def sinterstore(dest: str, keys: List[str]) -> int:

292

"""

293

Store intersection in destination key.

294

295

Args:

296

dest: Destination key name

297

keys: Source set key names

298

299

Returns:

300

Number of elements in result set

301

"""

302

303

async def sunionstore(dest: str, keys: List[str]) -> int:

304

"""

305

Store union in destination key.

306

307

Args:

308

dest: Destination key name

309

keys: Source set key names

310

311

Returns:

312

Number of elements in result set

313

"""

314

315

async def sdiffstore(dest: str, keys: List[str]) -> int:

316

"""

317

Store difference in destination key.

318

319

Args:

320

dest: Destination key name

321

keys: Source set key names

322

323

Returns:

324

Number of elements in result set

325

"""

326

```

327

328

### Hashes

329

330

Field-value mappings similar to dictionaries, ideal for representing objects and structured data.

331

332

```python { .api }

333

async def hset(name: str, key: str = None, value: Any = None, mapping: Dict[str, Any] = None) -> int:

334

"""

335

Set hash fields.

336

337

Args:

338

name: Hash key name

339

key: Field name (if setting single field)

340

value: Field value (if setting single field)

341

mapping: Dictionary of field-value pairs

342

343

Returns:

344

Number of fields added (not updated)

345

"""

346

347

async def hget(name: str, key: str) -> Optional[str]:

348

"""

349

Get hash field value.

350

351

Args:

352

name: Hash key name

353

key: Field name

354

355

Returns:

356

Field value or None if field doesn't exist

357

"""

358

359

async def hgetall(name: str) -> Dict[str, str]:

360

"""

361

Get all hash fields and values.

362

363

Args:

364

name: Hash key name

365

366

Returns:

367

Dictionary of all fields and values

368

"""

369

370

async def hmget(name: str, keys: List[str]) -> List[Optional[str]]:

371

"""

372

Get multiple hash fields.

373

374

Args:

375

name: Hash key name

376

keys: List of field names

377

378

Returns:

379

List of field values in same order

380

"""

381

382

async def hmset(name: str, mapping: Dict[str, Any]) -> bool:

383

"""

384

Set multiple hash fields.

385

386

Args:

387

name: Hash key name

388

mapping: Dictionary of field-value pairs

389

390

Returns:

391

True if successful

392

"""

393

394

async def hdel(name: str, *keys: str) -> int:

395

"""

396

Delete hash fields.

397

398

Args:

399

name: Hash key name

400

keys: Field names to delete

401

402

Returns:

403

Number of fields deleted

404

"""

405

406

async def hexists(name: str, key: str) -> bool:

407

"""

408

Check if hash field exists.

409

410

Args:

411

name: Hash key name

412

key: Field name

413

414

Returns:

415

True if field exists

416

"""

417

418

async def hlen(name: str) -> int:

419

"""

420

Get number of hash fields.

421

422

Args:

423

name: Hash key name

424

425

Returns:

426

Number of fields in hash

427

"""

428

429

async def hkeys(name: str) -> List[str]:

430

"""

431

Get all hash field names.

432

433

Args:

434

name: Hash key name

435

436

Returns:

437

List of field names

438

"""

439

440

async def hvals(name: str) -> List[str]:

441

"""

442

Get all hash field values.

443

444

Args:

445

name: Hash key name

446

447

Returns:

448

List of field values

449

"""

450

451

async def hincrby(name: str, key: str, amount: int = 1) -> int:

452

"""

453

Increment hash field by integer.

454

455

Args:

456

name: Hash key name

457

key: Field name

458

amount: Amount to increment

459

460

Returns:

461

New field value

462

"""

463

464

async def hincrbyfloat(name: str, key: str, amount: float = 1.0) -> float:

465

"""

466

Increment hash field by float.

467

468

Args:

469

name: Hash key name

470

key: Field name

471

amount: Amount to increment

472

473

Returns:

474

New field value

475

"""

476

477

async def hsetnx(name: str, key: str, value: Any) -> bool:

478

"""

479

Set hash field only if it doesn't exist.

480

481

Args:

482

name: Hash key name

483

key: Field name

484

value: Field value

485

486

Returns:

487

True if field was set, False if field exists

488

"""

489

```

490

491

### Sorted Sets

492

493

Ordered sets where each member has an associated score, enabling range queries and ranking operations.

494

495

```python { .api }

496

async def zadd(

497

name: str,

498

mapping: Dict[str, float],

499

nx: bool = False,

500

xx: bool = False,

501

ch: bool = False,

502

incr: bool = False

503

) -> int:

504

"""

505

Add members to sorted set with scores.

506

507

Args:

508

name: Sorted set key name

509

mapping: Dictionary of member-score pairs

510

nx: Only add new members

511

xx: Only update existing members

512

ch: Return number of changed members

513

incr: Increment score of single member

514

515

Returns:

516

Number of members added or changed

517

"""

518

519

async def zrem(name: str, *values: Any) -> int:

520

"""

521

Remove members from sorted set.

522

523

Args:

524

name: Sorted set key name

525

values: Members to remove

526

527

Returns:

528

Number of members removed

529

"""

530

531

async def zrange(

532

name: str,

533

start: int,

534

end: int,

535

desc: bool = False,

536

withscores: bool = False

537

) -> List[Union[str, Tuple[str, float]]]:

538

"""

539

Get sorted set members by rank range.

540

541

Args:

542

name: Sorted set key name

543

start: Start rank (0-based)

544

end: End rank (-1 for highest)

545

desc: Return in descending order

546

withscores: Include scores in result

547

548

Returns:

549

List of members or (member, score) tuples

550

"""

551

552

async def zrevrange(

553

name: str,

554

start: int,

555

end: int,

556

withscores: bool = False

557

) -> List[Union[str, Tuple[str, float]]]:

558

"""

559

Get sorted set members by rank range in reverse order.

560

561

Args:

562

name: Sorted set key name

563

start: Start rank (0-based)

564

end: End rank (-1 for lowest)

565

withscores: Include scores in result

566

567

Returns:

568

List of members or (member, score) tuples

569

"""

570

571

async def zrangebyscore(

572

name: str,

573

min_score: float,

574

max_score: float,

575

start: Optional[int] = None,

576

num: Optional[int] = None,

577

withscores: bool = False

578

) -> List[Union[str, Tuple[str, float]]]:

579

"""

580

Get sorted set members by score range.

581

582

Args:

583

name: Sorted set key name

584

min_score: Minimum score

585

max_score: Maximum score

586

start: Offset for pagination

587

num: Count for pagination

588

withscores: Include scores in result

589

590

Returns:

591

List of members or (member, score) tuples

592

"""

593

594

async def zcard(name: str) -> int:

595

"""

596

Get sorted set cardinality (size).

597

598

Args:

599

name: Sorted set key name

600

601

Returns:

602

Number of members in sorted set

603

"""

604

605

async def zscore(name: str, value: Any) -> Optional[float]:

606

"""

607

Get member score.

608

609

Args:

610

name: Sorted set key name

611

value: Member value

612

613

Returns:

614

Member score or None if member doesn't exist

615

"""

616

617

async def zrank(name: str, value: Any) -> Optional[int]:

618

"""

619

Get member rank (0-based, lowest score first).

620

621

Args:

622

name: Sorted set key name

623

value: Member value

624

625

Returns:

626

Member rank or None if member doesn't exist

627

"""

628

629

async def zrevrank(name: str, value: Any) -> Optional[int]:

630

"""

631

Get member reverse rank (0-based, highest score first).

632

633

Args:

634

name: Sorted set key name

635

value: Member value

636

637

Returns:

638

Member reverse rank or None if member doesn't exist

639

"""

640

641

async def zincrby(name: str, amount: float, value: Any) -> float:

642

"""

643

Increment member score.

644

645

Args:

646

name: Sorted set key name

647

amount: Amount to increment

648

value: Member value

649

650

Returns:

651

New member score

652

"""

653

654

async def zcount(name: str, min_score: float, max_score: float) -> int:

655

"""

656

Count members in score range.

657

658

Args:

659

name: Sorted set key name

660

min_score: Minimum score

661

max_score: Maximum score

662

663

Returns:

664

Number of members in range

665

"""

666

```

667

668

### Streams

669

670

Log-like data structures for storing sequences of field-value pairs with automatic ID generation and consumer group support.

671

672

```python { .api }

673

async def xadd(

674

name: str,

675

fields: Dict[str, Any],

676

id: str = "*",

677

maxlen: Optional[int] = None,

678

approximate: bool = True

679

) -> str:

680

"""

681

Add entry to stream.

682

683

Args:

684

name: Stream key name

685

fields: Dictionary of field-value pairs

686

id: Entry ID (* for auto-generation)

687

maxlen: Maximum stream length

688

approximate: Use approximate trimming

689

690

Returns:

691

Entry ID

692

"""

693

694

async def xread(

695

streams: Dict[str, str],

696

count: Optional[int] = None,

697

block: Optional[int] = None

698

) -> Dict[str, List[Tuple[str, Dict[str, str]]]]:

699

"""

700

Read from streams.

701

702

Args:

703

streams: Dictionary of stream names to start IDs

704

count: Maximum entries per stream

705

block: Block for milliseconds if no data

706

707

Returns:

708

Dictionary mapping stream names to entries

709

"""

710

711

async def xrange(

712

name: str,

713

min: str = "-",

714

max: str = "+",

715

count: Optional[int] = None

716

) -> List[Tuple[str, Dict[str, str]]]:

717

"""

718

Get stream entries by ID range.

719

720

Args:

721

name: Stream key name

722

min: Minimum ID ("-" for start)

723

max: Maximum ID ("+" for end)

724

count: Maximum entries to return

725

726

Returns:

727

List of (entry_id, fields) tuples

728

"""

729

730

async def xlen(name: str) -> int:

731

"""

732

Get stream length.

733

734

Args:

735

name: Stream key name

736

737

Returns:

738

Number of entries in stream

739

"""

740

741

async def xdel(name: str, *ids: str) -> int:

742

"""

743

Delete stream entries.

744

745

Args:

746

name: Stream key name

747

ids: Entry IDs to delete

748

749

Returns:

750

Number of entries deleted

751

"""

752

753

async def xtrim(name: str, maxlen: int, approximate: bool = True) -> int:

754

"""

755

Trim stream to maximum length.

756

757

Args:

758

name: Stream key name

759

maxlen: Maximum length

760

approximate: Use approximate trimming

761

762

Returns:

763

Number of entries removed

764

"""

765

766

async def xgroup_create(

767

name: str,

768

groupname: str,

769

id: str = "$",

770

mkstream: bool = False

771

) -> bool:

772

"""

773

Create consumer group.

774

775

Args:

776

name: Stream key name

777

groupname: Consumer group name

778

id: Starting ID ("$" for latest)

779

mkstream: Create stream if it doesn't exist

780

781

Returns:

782

True if successful

783

"""

784

785

async def xreadgroup(

786

groupname: str,

787

consumername: str,

788

streams: Dict[str, str],

789

count: Optional[int] = None,

790

block: Optional[int] = None,

791

noack: bool = False

792

) -> Dict[str, List[Tuple[str, Dict[str, str]]]]:

793

"""

794

Read from stream as consumer group member.

795

796

Args:

797

groupname: Consumer group name

798

consumername: Consumer name

799

streams: Dictionary of stream names to start IDs

800

count: Maximum entries per stream

801

block: Block for milliseconds if no data

802

noack: Don't automatically acknowledge

803

804

Returns:

805

Dictionary mapping stream names to entries

806

"""

807

808

async def xack(name: str, groupname: str, *ids: str) -> int:

809

"""

810

Acknowledge stream messages.

811

812

Args:

813

name: Stream key name

814

groupname: Consumer group name

815

ids: Entry IDs to acknowledge

816

817

Returns:

818

Number of messages acknowledged

819

"""

820

```

821

822

### HyperLogLog

823

824

Probabilistic data structure for approximate cardinality counting of large sets with minimal memory usage.

825

826

```python { .api }

827

async def pfadd(name: str, *values: Any) -> bool:

828

"""

829

Add elements to HyperLogLog.

830

831

Args:

832

name: HyperLogLog key name

833

values: Elements to add

834

835

Returns:

836

True if cardinality changed

837

"""

838

839

async def pfcount(*sources: str) -> int:

840

"""

841

Get HyperLogLog cardinality estimate.

842

843

Args:

844

sources: HyperLogLog key names

845

846

Returns:

847

Estimated cardinality

848

"""

849

850

async def pfmerge(dest: str, *sources: str) -> bool:

851

"""

852

Merge HyperLogLogs into destination.

853

854

Args:

855

dest: Destination key name

856

sources: Source HyperLogLog key names

857

858

Returns:

859

True if successful

860

"""

861

```

862

863

### Geospatial

864

865

Geographic coordinate storage and querying with radius-based searches and distance calculations.

866

867

```python { .api }

868

async def geoadd(name: str, *values: Any) -> int:

869

"""

870

Add geospatial members.

871

872

Args:

873

name: Geospatial key name

874

values: Longitude, latitude, member tuples

875

876

Returns:

877

Number of members added

878

"""

879

880

async def geodist(name: str, place1: str, place2: str, unit: str = "m") -> Optional[float]:

881

"""

882

Get distance between geospatial members.

883

884

Args:

885

name: Geospatial key name

886

place1: First member name

887

place2: Second member name

888

unit: Distance unit ('m', 'km', 'mi', 'ft')

889

890

Returns:

891

Distance in specified unit

892

"""

893

894

async def georadius(

895

name: str,

896

longitude: float,

897

latitude: float,

898

radius: float,

899

unit: str = "m",

900

**kwargs

901

) -> List[Any]:

902

"""

903

Find members within radius of coordinates.

904

905

Args:

906

name: Geospatial key name

907

longitude: Center longitude

908

latitude: Center latitude

909

radius: Search radius

910

unit: Distance unit

911

912

Returns:

913

List of members within radius

914

"""

915

916

async def geopos(name: str, *values: str) -> List[Optional[Tuple[float, float]]]:

917

"""

918

Get coordinates of geospatial members.

919

920

Args:

921

name: Geospatial key name

922

values: Member names

923

924

Returns:

925

List of (longitude, latitude) tuples

926

"""

927

```

928

929

## Usage Examples

930

931

### Working with Lists

932

933

```python

934

async def list_examples():

935

redis = aioredis.Redis(decode_responses=True)

936

937

# Queue operations (FIFO)

938

await redis.lpush('queue', 'task1', 'task2', 'task3')

939

task = await redis.rpop('queue') # Gets 'task1'

940

941

# Stack operations (LIFO)

942

await redis.lpush('stack', 'item1', 'item2', 'item3')

943

item = await redis.lpop('stack') # Gets 'item3'

944

945

# Get range of items

946

items = await redis.lrange('mylist', 0, 4) # First 5 items

947

all_items = await redis.lrange('mylist', 0, -1) # All items

948

949

# Blocking operations for producer/consumer

950

result = await redis.blpop(['queue1', 'queue2'], timeout=10)

951

if result:

952

queue_name, value = result

953

print(f"Got {value} from {queue_name}")

954

```

955

956

### Working with Sets

957

958

```python

959

async def set_examples():

960

redis = aioredis.Redis(decode_responses=True)

961

962

# Add members to sets

963

await redis.sadd('tags:python', 'web', 'async', 'redis')

964

await redis.sadd('tags:javascript', 'web', 'frontend', 'react')

965

966

# Set operations

967

common = await redis.sinter(['tags:python', 'tags:javascript'])

968

print(f"Common tags: {common}") # {'web'}

969

970

all_tags = await redis.sunion(['tags:python', 'tags:javascript'])

971

python_only = await redis.sdiff(['tags:python', 'tags:javascript'])

972

973

# Check membership

974

is_member = await redis.sismember('tags:python', 'async')

975

print(f"'async' in Python tags: {is_member}") # True

976

```

977

978

### Working with Hashes

979

980

```python

981

async def hash_examples():

982

redis = aioredis.Redis(decode_responses=True)

983

984

# Store user data

985

user_data = {

986

'name': 'John Doe',

987

'email': 'john@example.com',

988

'age': '30',

989

'city': 'New York'

990

}

991

await redis.hset('user:1', mapping=user_data)

992

993

# Get specific fields

994

name = await redis.hget('user:1', 'name')

995

email = await redis.hget('user:1', 'email')

996

997

# Get multiple fields

998

info = await redis.hmget('user:1', ['name', 'email', 'city'])

999

1000

# Get all fields

1001

all_data = await redis.hgetall('user:1')

1002

1003

# Increment numeric fields

1004

await redis.hincrby('user:1', 'login_count', 1)

1005

await redis.hincrbyfloat('user:1', 'balance', 25.50)

1006

```

1007

1008

### Working with Sorted Sets

1009

1010

```python

1011

async def sorted_set_examples():

1012

redis = aioredis.Redis(decode_responses=True)

1013

1014

# Add players with scores

1015

await redis.zadd('leaderboard', {

1016

'player1': 100,

1017

'player2': 150,

1018

'player3': 75,

1019

'player4': 200

1020

})

1021

1022

# Get top players

1023

top_3 = await redis.zrevrange('leaderboard', 0, 2, withscores=True)

1024

print(f"Top 3: {top_3}") # [('player4', 200), ('player2', 150), ('player1', 100)]

1025

1026

# Get player rank

1027

rank = await redis.zrevrank('leaderboard', 'player1') # 2 (0-based)

1028

1029

# Get players in score range

1030

middle_players = await redis.zrangebyscore('leaderboard', 80, 160)

1031

1032

# Increment score

1033

new_score = await redis.zincrby('leaderboard', 25, 'player3') # 100

1034

```

1035

1036

### Working with Streams

1037

1038

```python

1039

async def stream_examples():

1040

redis = aioredis.Redis(decode_responses=True)

1041

1042

# Add entries to stream

1043

entry_id1 = await redis.xadd('events', {

1044

'type': 'user_login',

1045

'user_id': '123',

1046

'timestamp': '2023-01-01T10:00:00Z'

1047

})

1048

1049

entry_id2 = await redis.xadd('events', {

1050

'type': 'purchase',

1051

'user_id': '123',

1052

'amount': '29.99'

1053

})

1054

1055

# Read from stream

1056

entries = await redis.xrange('events', min='-', max='+')

1057

for entry_id, fields in entries:

1058

print(f"{entry_id}: {fields}")

1059

1060

# Create consumer group

1061

await redis.xgroup_create('events', 'processors', id='0')

1062

1063

# Read as consumer group

1064

messages = await redis.xreadgroup(

1065

'processors', 'worker1',

1066

{'events': '>'},

1067

count=5, block=1000

1068

)

1069

1070

# Acknowledge processed messages

1071

if 'events' in messages:

1072

message_ids = [msg[0] for msg in messages['events']]

1073

await redis.xack('events', 'processors', *message_ids)

1074

```