or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md

pipelines-transactions.mddocs/

0

# Pipelines and Transactions

1

2

Redis pipelines and transactions provide efficient batching of multiple commands and atomic execution guarantees. Pipelines reduce network round-trips by sending multiple commands at once, while transactions ensure atomicity with WATCH-based optimistic locking.

3

4

## Capabilities

5

6

### Pipeline Operations

7

8

Pipeline class for batching multiple Redis commands and executing them efficiently.

9

10

```python { .api }

11

def pipeline(

12

self,

13

transaction: bool = True,

14

shard_hint: Optional[str] = None

15

) -> "Pipeline": ...

16

17

class Pipeline:

18

def execute(self, raise_on_error: bool = True) -> List[Any]: ...

19

20

def reset(self) -> None: ...

21

22

def watch(self, *names: KeyT) -> bool: ...

23

24

def multi(self) -> "Pipeline": ...

25

26

def discard(self) -> None: ...

27

28

def __enter__(self) -> "Pipeline": ...

29

30

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

31

```

32

33

### Transaction Operations

34

35

Redis transaction support with MULTI/EXEC and optimistic locking via WATCH.

36

37

```python { .api }

38

def transaction(

39

self,

40

func: Callable[["Pipeline"], Any],

41

*watches: KeyT,

42

**kwargs

43

) -> Any: ...

44

45

def watch(self, *names: KeyT) -> bool: ...

46

47

def unwatch(self) -> bool: ...

48

49

def multi(self) -> bool: ...

50

51

def exec(self) -> Optional[List[Any]]: ...

52

53

def discard(self) -> bool: ...

54

```

55

56

### Pipeline Command Methods

57

58

All Redis commands are available in pipeline mode for batching.

59

60

```python { .api }

61

# String operations in pipeline

62

def set(self, name: KeyT, value: EncodableT, **kwargs) -> "Pipeline": ...

63

def get(self, name: KeyT) -> "Pipeline": ...

64

def mget(self, keys: List[KeyT], *args: KeyT) -> "Pipeline": ...

65

def mset(self, mapping: Dict[KeyT, EncodableT]) -> "Pipeline": ...

66

67

# Hash operations in pipeline

68

def hset(self, name: KeyT, key: Optional[FieldT] = None, value: Optional[EncodableT] = None, mapping: Optional[Dict[FieldT, EncodableT]] = None) -> "Pipeline": ...

69

def hget(self, name: KeyT, key: FieldT) -> "Pipeline": ...

70

def hgetall(self, name: KeyT) -> "Pipeline": ...

71

72

# List operations in pipeline

73

def lpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...

74

def rpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...

75

def lpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...

76

def rpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...

77

78

# Set operations in pipeline

79

def sadd(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...

80

def smembers(self, name: KeyT) -> "Pipeline": ...

81

82

# Sorted set operations in pipeline

83

def zadd(self, name: KeyT, mapping: Dict[EncodableT, float], **kwargs) -> "Pipeline": ...

84

def zrange(self, name: KeyT, start: int, end: int, **kwargs) -> "Pipeline": ...

85

86

# Key operations in pipeline

87

def delete(self, *names: KeyT) -> "Pipeline": ...

88

def exists(self, *names: KeyT) -> "Pipeline": ...

89

def expire(self, name: KeyT, time: ExpiryT) -> "Pipeline": ...

90

```

91

92

## Usage Examples

93

94

### Basic Pipeline Usage

95

96

```python

97

import redis

98

99

r = redis.Redis(host='localhost', port=6379, db=0)

100

101

# Create pipeline

102

pipe = r.pipeline()

103

104

# Queue multiple commands

105

pipe.set('user:1001', 'John')

106

pipe.set('user:1002', 'Jane')

107

pipe.get('user:1001')

108

pipe.get('user:1002')

109

pipe.incr('page_views')

110

111

# Execute all commands at once

112

results = pipe.execute()

113

print(f"Results: {results}") # [True, True, b'John', b'Jane', 1]

114

```

115

116

### Pipeline with Context Manager

117

118

```python

119

import redis

120

121

r = redis.Redis(host='localhost', port=6379, db=0)

122

123

# Pipeline with automatic execution

124

with r.pipeline() as pipe:

125

pipe.set('temp:key1', 'value1')

126

pipe.set('temp:key2', 'value2')

127

pipe.mget(['temp:key1', 'temp:key2'])

128

results = pipe.execute()

129

130

print(f"Pipeline results: {results}")

131

```

132

133

### Transaction with MULTI/EXEC

134

135

```python

136

import redis

137

138

r = redis.Redis(host='localhost', port=6379, db=0)

139

140

# Initialize counter

141

r.set('counter', 0)

142

143

# Transaction pipeline (default behavior)

144

pipe = r.pipeline(transaction=True)

145

146

try:

147

# Queue commands in transaction

148

pipe.multi()

149

pipe.incr('counter')

150

pipe.incr('counter')

151

pipe.get('counter')

152

153

# Execute transaction atomically

154

results = pipe.execute()

155

print(f"Transaction results: {results}") # [1, 2, b'2']

156

157

except redis.WatchError:

158

print("Transaction aborted due to watched key modification")

159

```

160

161

### Optimistic Locking with WATCH

162

163

```python

164

import redis

165

import time

166

167

r = redis.Redis(host='localhost', port=6379, db=0)

168

169

# Initialize balance

170

r.set('account:balance', 1000)

171

172

def transfer_money(amount):

173

"""Transfer money using optimistic locking"""

174

pipe = r.pipeline()

175

176

while True:

177

try:

178

# Watch the balance key

179

pipe.watch('account:balance')

180

181

# Get current balance

182

current_balance = int(r.get('account:balance') or 0)

183

184

# Check if sufficient funds

185

if current_balance < amount:

186

pipe.unwatch()

187

raise ValueError("Insufficient funds")

188

189

# Calculate new balance

190

new_balance = current_balance - amount

191

192

# Start transaction

193

pipe.multi()

194

pipe.set('account:balance', new_balance)

195

196

# Execute transaction

197

pipe.execute()

198

print(f"Transfer successful. New balance: {new_balance}")

199

break

200

201

except redis.WatchError:

202

# Key was modified, retry

203

print("Balance modified by another client, retrying...")

204

continue

205

206

# Demonstrate concurrent transfers

207

transfer_money(100)

208

transfer_money(200)

209

```

210

211

### High-Level Transaction Helper

212

213

```python

214

import redis

215

216

r = redis.Redis(host='localhost', port=6379, db=0)

217

218

def update_user_profile(user_id, name, email):

219

"""Update user profile atomically"""

220

def transaction_func(pipe):

221

# Commands executed within transaction

222

pipe.hset(f'user:{user_id}', 'name', name)

223

pipe.hset(f'user:{user_id}', 'email', email)

224

pipe.hset(f'user:{user_id}', 'updated_at', int(time.time()))

225

pipe.sadd('updated_users', user_id)

226

227

# Execute with automatic WATCH/MULTI/EXEC handling

228

result = r.transaction(transaction_func, f'user:{user_id}')

229

return result

230

231

# Update user profile

232

result = update_user_profile(1001, 'John Doe', 'john@example.com')

233

print(f"Profile update result: {result}")

234

```

235

236

### Bulk Data Operations

237

238

```python

239

import redis

240

241

r = redis.Redis(host='localhost', port=6379, db=0)

242

243

def bulk_insert_users(users):

244

"""Insert multiple users efficiently using pipeline"""

245

pipe = r.pipeline()

246

247

for user_id, user_data in users.items():

248

# Hash for user profile

249

pipe.hset(f'user:{user_id}', mapping=user_data)

250

251

# Add to user index

252

pipe.sadd('all_users', user_id)

253

254

# Add to age-based index

255

if 'age' in user_data:

256

pipe.zadd('users_by_age', {user_id: int(user_data['age'])})

257

258

# Execute all operations

259

results = pipe.execute()

260

return len([r for r in results if r])

261

262

# Bulk insert example

263

users_data = {

264

1001: {'name': 'John', 'email': 'john@example.com', 'age': '30'},

265

1002: {'name': 'Jane', 'email': 'jane@example.com', 'age': '25'},

266

1003: {'name': 'Bob', 'email': 'bob@example.com', 'age': '35'},

267

}

268

269

successful_ops = bulk_insert_users(users_data)

270

print(f"Completed {successful_ops} operations")

271

```

272

273

### Pipeline Error Handling

274

275

```python

276

import redis

277

from redis.exceptions import ResponseError

278

279

r = redis.Redis(host='localhost', port=6379, db=0)

280

281

# Pipeline with error handling

282

pipe = r.pipeline()

283

284

# Mix of valid and invalid operations

285

pipe.set('valid_key', 'value')

286

pipe.lpush('valid_key', 'item') # This will fail - wrong type

287

pipe.get('valid_key')

288

pipe.set('another_key', 'another_value')

289

290

try:

291

results = pipe.execute(raise_on_error=True)

292

except ResponseError as e:

293

print(f"Pipeline error: {e}")

294

295

# Handle errors without raising exceptions

296

results = pipe.execute(raise_on_error=False)

297

for i, result in enumerate(results):

298

if isinstance(result, Exception):

299

print(f"Command {i} failed: {result}")

300

else:

301

print(f"Command {i} result: {result}")

302

```

303

304

### Complex Transaction Example

305

306

```python

307

import redis

308

import json

309

from datetime import datetime

310

311

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

312

313

def create_order(user_id, product_id, quantity):

314

"""Create order with inventory check and update"""

315

316

def order_transaction(pipe):

317

# Get current inventory

318

current_stock = pipe.get(f'inventory:{product_id}')

319

current_stock = int(current_stock) if current_stock else 0

320

321

# Check stock availability

322

if current_stock < quantity:

323

raise ValueError(f"Insufficient stock. Available: {current_stock}")

324

325

# Generate order ID

326

order_id = pipe.incr('order_counter')

327

328

# Create order

329

order_data = {

330

'order_id': order_id,

331

'user_id': user_id,

332

'product_id': product_id,

333

'quantity': quantity,

334

'created_at': datetime.now().isoformat(),

335

'status': 'pending'

336

}

337

338

# Update inventory

339

new_stock = current_stock - quantity

340

pipe.set(f'inventory:{product_id}', new_stock)

341

342

# Store order

343

pipe.hset(f'order:{order_id}', mapping=order_data)

344

345

# Add to user's orders

346

pipe.sadd(f'user:{user_id}:orders', order_id)

347

348

# Add to pending orders

349

pipe.sadd('pending_orders', order_id)

350

351

# Update product sales count

352

pipe.incr(f'product:{product_id}:sales')

353

354

return order_id

355

356

# Watch inventory for consistency

357

try:

358

result = r.transaction(

359

order_transaction,

360

f'inventory:{product_id}'

361

)

362

return result[0] # Return order_id

363

364

except redis.WatchError:

365

raise Exception("Order failed due to concurrent inventory update")

366

367

# Initialize test data

368

r.set('inventory:123', 10)

369

r.set('order_counter', 1000)

370

371

# Create orders

372

try:

373

order_id = create_order(user_id=1001, product_id=123, quantity=2)

374

print(f"Order created successfully: {order_id}")

375

376

# Check updated inventory

377

remaining_stock = r.get('inventory:123')

378

print(f"Remaining stock: {remaining_stock}")

379

380

except Exception as e:

381

print(f"Order creation failed: {e}")

382

```

383

384

### Non-Transactional Pipeline

385

386

```python

387

import redis

388

389

r = redis.Redis(host='localhost', port=6379, db=0)

390

391

# Non-transactional pipeline for better performance

392

pipe = r.pipeline(transaction=False)

393

394

# Batch read operations

395

keys_to_check = ['user:1001', 'user:1002', 'user:1003', 'user:1004']

396

397

for key in keys_to_check:

398

pipe.exists(key)

399

pipe.hgetall(key)

400

401

# Execute all at once

402

results = pipe.execute()

403

404

# Process results (exists, hgetall pairs)

405

for i in range(0, len(results), 2):

406

key = keys_to_check[i // 2]

407

exists = results[i]

408

data = results[i + 1]

409

410

if exists:

411

print(f"{key}: {data}")

412

else:

413

print(f"{key}: Not found")

414

```

415

416

### Pipeline Reset and Reuse

417

418

```python

419

import redis

420

421

r = redis.Redis(host='localhost', port=6379, db=0)

422

423

# Create reusable pipeline

424

pipe = r.pipeline()

425

426

# First batch of operations

427

pipe.set('batch1_key1', 'value1')

428

pipe.set('batch1_key2', 'value2')

429

batch1_results = pipe.execute()

430

print(f"Batch 1: {batch1_results}")

431

432

# Reset pipeline for reuse

433

pipe.reset()

434

435

# Second batch of operations

436

pipe.get('batch1_key1')

437

pipe.get('batch1_key2')

438

pipe.set('batch2_key', 'value')

439

batch2_results = pipe.execute()

440

print(f"Batch 2: {batch2_results}")

441

```