0
# Pipelines and Transactions
1
2
Redis pipelines and transactions provide efficient batching of multiple commands and atomic execution guarantees. Pipelines reduce network round-trips by sending multiple commands at once, while transactions ensure atomicity with WATCH-based optimistic locking.
3
4
## Capabilities
5
6
### Pipeline Operations
7
8
Pipeline class for batching multiple Redis commands and executing them efficiently.
9
10
```python { .api }
11
def pipeline(
12
self,
13
transaction: bool = True,
14
shard_hint: Optional[str] = None
15
) -> "Pipeline": ...
16
17
class Pipeline:
18
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
19
20
def reset(self) -> None: ...
21
22
def watch(self, *names: KeyT) -> bool: ...
23
24
def multi(self) -> "Pipeline": ...
25
26
def discard(self) -> None: ...
27
28
def __enter__(self) -> "Pipeline": ...
29
30
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
31
```
32
33
### Transaction Operations
34
35
Redis transaction support with MULTI/EXEC and optimistic locking via WATCH.
36
37
```python { .api }
38
def transaction(
39
self,
40
func: Callable[["Pipeline"], Any],
41
*watches: KeyT,
42
**kwargs
43
) -> Any: ...
44
45
def watch(self, *names: KeyT) -> bool: ...
46
47
def unwatch(self) -> bool: ...
48
49
def multi(self) -> bool: ...
50
51
def exec(self) -> Optional[List[Any]]: ...
52
53
def discard(self) -> bool: ...
54
```
55
56
### Pipeline Command Methods
57
58
All Redis commands are available in pipeline mode for batching.
59
60
```python { .api }
61
# String operations in pipeline
62
def set(self, name: KeyT, value: EncodableT, **kwargs) -> "Pipeline": ...
63
def get(self, name: KeyT) -> "Pipeline": ...
64
def mget(self, keys: List[KeyT], *args: KeyT) -> "Pipeline": ...
65
def mset(self, mapping: Dict[KeyT, EncodableT]) -> "Pipeline": ...
66
67
# Hash operations in pipeline
68
def hset(self, name: KeyT, key: Optional[FieldT] = None, value: Optional[EncodableT] = None, mapping: Optional[Dict[FieldT, EncodableT]] = None) -> "Pipeline": ...
69
def hget(self, name: KeyT, key: FieldT) -> "Pipeline": ...
70
def hgetall(self, name: KeyT) -> "Pipeline": ...
71
72
# List operations in pipeline
73
def lpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
74
def rpush(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
75
def lpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...
76
def rpop(self, name: KeyT, count: Optional[int] = None) -> "Pipeline": ...
77
78
# Set operations in pipeline
79
def sadd(self, name: KeyT, *values: EncodableT) -> "Pipeline": ...
80
def smembers(self, name: KeyT) -> "Pipeline": ...
81
82
# Sorted set operations in pipeline
83
def zadd(self, name: KeyT, mapping: Dict[EncodableT, float], **kwargs) -> "Pipeline": ...
84
def zrange(self, name: KeyT, start: int, end: int, **kwargs) -> "Pipeline": ...
85
86
# Key operations in pipeline
87
def delete(self, *names: KeyT) -> "Pipeline": ...
88
def exists(self, *names: KeyT) -> "Pipeline": ...
89
def expire(self, name: KeyT, time: ExpiryT) -> "Pipeline": ...
90
```
91
92
## Usage Examples
93
94
### Basic Pipeline Usage
95
96
```python
97
import redis
98
99
r = redis.Redis(host='localhost', port=6379, db=0)
100
101
# Create pipeline
102
pipe = r.pipeline()
103
104
# Queue multiple commands
105
pipe.set('user:1001', 'John')
106
pipe.set('user:1002', 'Jane')
107
pipe.get('user:1001')
108
pipe.get('user:1002')
109
pipe.incr('page_views')
110
111
# Execute all commands at once
112
results = pipe.execute()
113
print(f"Results: {results}") # [True, True, b'John', b'Jane', 1]
114
```
115
116
### Pipeline with Context Manager
117
118
```python
119
import redis
120
121
r = redis.Redis(host='localhost', port=6379, db=0)
122
123
# Pipeline with automatic execution
124
with r.pipeline() as pipe:
125
pipe.set('temp:key1', 'value1')
126
pipe.set('temp:key2', 'value2')
127
pipe.mget(['temp:key1', 'temp:key2'])
128
results = pipe.execute()
129
130
print(f"Pipeline results: {results}")
131
```
132
133
### Transaction with MULTI/EXEC
134
135
```python
136
import redis
137
138
r = redis.Redis(host='localhost', port=6379, db=0)
139
140
# Initialize counter
141
r.set('counter', 0)
142
143
# Transaction pipeline (default behavior)
144
pipe = r.pipeline(transaction=True)
145
146
try:
147
# Queue commands in transaction
148
pipe.multi()
149
pipe.incr('counter')
150
pipe.incr('counter')
151
pipe.get('counter')
152
153
# Execute transaction atomically
154
results = pipe.execute()
155
print(f"Transaction results: {results}") # [1, 2, b'2']
156
157
except redis.WatchError:
158
print("Transaction aborted due to watched key modification")
159
```
160
161
### Optimistic Locking with WATCH
162
163
```python
164
import redis
165
import time
166
167
r = redis.Redis(host='localhost', port=6379, db=0)
168
169
# Initialize balance
170
r.set('account:balance', 1000)
171
172
def transfer_money(amount):
173
"""Transfer money using optimistic locking"""
174
pipe = r.pipeline()
175
176
while True:
177
try:
178
# Watch the balance key
179
pipe.watch('account:balance')
180
181
# Get current balance
182
current_balance = int(r.get('account:balance') or 0)
183
184
# Check if sufficient funds
185
if current_balance < amount:
186
pipe.unwatch()
187
raise ValueError("Insufficient funds")
188
189
# Calculate new balance
190
new_balance = current_balance - amount
191
192
# Start transaction
193
pipe.multi()
194
pipe.set('account:balance', new_balance)
195
196
# Execute transaction
197
pipe.execute()
198
print(f"Transfer successful. New balance: {new_balance}")
199
break
200
201
except redis.WatchError:
202
# Key was modified, retry
203
print("Balance modified by another client, retrying...")
204
continue
205
206
# Demonstrate concurrent transfers
207
transfer_money(100)
208
transfer_money(200)
209
```
210
211
### High-Level Transaction Helper
212
213
```python
214
import redis
215
216
r = redis.Redis(host='localhost', port=6379, db=0)
217
218
def update_user_profile(user_id, name, email):
219
"""Update user profile atomically"""
220
def transaction_func(pipe):
221
# Commands executed within transaction
222
pipe.hset(f'user:{user_id}', 'name', name)
223
pipe.hset(f'user:{user_id}', 'email', email)
224
pipe.hset(f'user:{user_id}', 'updated_at', int(time.time()))
225
pipe.sadd('updated_users', user_id)
226
227
# Execute with automatic WATCH/MULTI/EXEC handling
228
result = r.transaction(transaction_func, f'user:{user_id}')
229
return result
230
231
# Update user profile
232
result = update_user_profile(1001, 'John Doe', 'john@example.com')
233
print(f"Profile update result: {result}")
234
```
235
236
### Bulk Data Operations
237
238
```python
239
import redis
240
241
r = redis.Redis(host='localhost', port=6379, db=0)
242
243
def bulk_insert_users(users):
244
"""Insert multiple users efficiently using pipeline"""
245
pipe = r.pipeline()
246
247
for user_id, user_data in users.items():
248
# Hash for user profile
249
pipe.hset(f'user:{user_id}', mapping=user_data)
250
251
# Add to user index
252
pipe.sadd('all_users', user_id)
253
254
# Add to age-based index
255
if 'age' in user_data:
256
pipe.zadd('users_by_age', {user_id: int(user_data['age'])})
257
258
# Execute all operations
259
results = pipe.execute()
260
return len([r for r in results if r])
261
262
# Bulk insert example
263
users_data = {
264
1001: {'name': 'John', 'email': 'john@example.com', 'age': '30'},
265
1002: {'name': 'Jane', 'email': 'jane@example.com', 'age': '25'},
266
1003: {'name': 'Bob', 'email': 'bob@example.com', 'age': '35'},
267
}
268
269
successful_ops = bulk_insert_users(users_data)
270
print(f"Completed {successful_ops} operations")
271
```
272
273
### Pipeline Error Handling
274
275
```python
276
import redis
277
from redis.exceptions import ResponseError
278
279
r = redis.Redis(host='localhost', port=6379, db=0)
280
281
# Pipeline with error handling
282
pipe = r.pipeline()
283
284
# Mix of valid and invalid operations
285
pipe.set('valid_key', 'value')
286
pipe.lpush('valid_key', 'item') # This will fail - wrong type
287
pipe.get('valid_key')
288
pipe.set('another_key', 'another_value')
289
290
try:
291
results = pipe.execute(raise_on_error=True)
292
except ResponseError as e:
293
print(f"Pipeline error: {e}")
294
295
# Handle errors without raising exceptions
296
results = pipe.execute(raise_on_error=False)
297
for i, result in enumerate(results):
298
if isinstance(result, Exception):
299
print(f"Command {i} failed: {result}")
300
else:
301
print(f"Command {i} result: {result}")
302
```
303
304
### Complex Transaction Example
305
306
```python
307
import redis
308
import json
309
from datetime import datetime
310
311
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
312
313
def create_order(user_id, product_id, quantity):
314
"""Create order with inventory check and update"""
315
316
def order_transaction(pipe):
317
# Get current inventory
318
current_stock = pipe.get(f'inventory:{product_id}')
319
current_stock = int(current_stock) if current_stock else 0
320
321
# Check stock availability
322
if current_stock < quantity:
323
raise ValueError(f"Insufficient stock. Available: {current_stock}")
324
325
# Generate order ID
326
order_id = pipe.incr('order_counter')
327
328
# Create order
329
order_data = {
330
'order_id': order_id,
331
'user_id': user_id,
332
'product_id': product_id,
333
'quantity': quantity,
334
'created_at': datetime.now().isoformat(),
335
'status': 'pending'
336
}
337
338
# Update inventory
339
new_stock = current_stock - quantity
340
pipe.set(f'inventory:{product_id}', new_stock)
341
342
# Store order
343
pipe.hset(f'order:{order_id}', mapping=order_data)
344
345
# Add to user's orders
346
pipe.sadd(f'user:{user_id}:orders', order_id)
347
348
# Add to pending orders
349
pipe.sadd('pending_orders', order_id)
350
351
# Update product sales count
352
pipe.incr(f'product:{product_id}:sales')
353
354
return order_id
355
356
# Watch inventory for consistency
357
try:
358
result = r.transaction(
359
order_transaction,
360
f'inventory:{product_id}'
361
)
362
return result[0] # Return order_id
363
364
except redis.WatchError:
365
raise Exception("Order failed due to concurrent inventory update")
366
367
# Initialize test data
368
r.set('inventory:123', 10)
369
r.set('order_counter', 1000)
370
371
# Create orders
372
try:
373
order_id = create_order(user_id=1001, product_id=123, quantity=2)
374
print(f"Order created successfully: {order_id}")
375
376
# Check updated inventory
377
remaining_stock = r.get('inventory:123')
378
print(f"Remaining stock: {remaining_stock}")
379
380
except Exception as e:
381
print(f"Order creation failed: {e}")
382
```
383
384
### Non-Transactional Pipeline
385
386
```python
387
import redis
388
389
r = redis.Redis(host='localhost', port=6379, db=0)
390
391
# Non-transactional pipeline for better performance
392
pipe = r.pipeline(transaction=False)
393
394
# Batch read operations
395
keys_to_check = ['user:1001', 'user:1002', 'user:1003', 'user:1004']
396
397
for key in keys_to_check:
398
pipe.exists(key)
399
pipe.hgetall(key)
400
401
# Execute all at once
402
results = pipe.execute()
403
404
# Process results (exists, hgetall pairs)
405
for i in range(0, len(results), 2):
406
key = keys_to_check[i // 2]
407
exists = results[i]
408
data = results[i + 1]
409
410
if exists:
411
print(f"{key}: {data}")
412
else:
413
print(f"{key}: Not found")
414
```
415
416
### Pipeline Reset and Reuse
417
418
```python
419
import redis
420
421
r = redis.Redis(host='localhost', port=6379, db=0)
422
423
# Create reusable pipeline
424
pipe = r.pipeline()
425
426
# First batch of operations
427
pipe.set('batch1_key1', 'value1')
428
pipe.set('batch1_key2', 'value2')
429
batch1_results = pipe.execute()
430
print(f"Batch 1: {batch1_results}")
431
432
# Reset pipeline for reuse
433
pipe.reset()
434
435
# Second batch of operations
436
pipe.get('batch1_key1')
437
pipe.get('batch1_key2')
438
pipe.set('batch2_key', 'value')
439
batch2_results = pipe.execute()
440
print(f"Batch 2: {batch2_results}")
441
```