docs
0
# Transaction Operations
1
2
Redis transaction support with MULTI/EXEC commands and optimistic locking via WATCH. Transactions provide atomic execution of command groups with conditional execution based on watched key modifications, enabling safe concurrent operations and complex multi-step procedures.
3
4
## Capabilities
5
6
### Transaction Control
7
8
Core transaction commands for grouping operations and atomic execution.
9
10
```python { .api }
11
def multi(self) -> None: ...
12
13
def exec_(self) -> List[Any]: ...
14
15
def discard(self) -> None: ...
16
```
17
18
### Optimistic Locking
19
20
Watch mechanisms for conditional transaction execution based on key modifications.
21
22
```python { .api }
23
def watch(self, *names: KeyT) -> bool: ...
24
25
def unwatch(self) -> bool: ...
26
```
27
28
### Pipeline Operations
29
30
Pipelined command execution for performance optimization with optional transaction semantics.
31
32
```python { .api }
33
class Pipeline:
34
def __init__(self, connection_pool, response_callbacks, transaction: bool, shard_hint: Optional[str]): ...
35
36
def __enter__(self) -> Pipeline: ...
37
38
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
39
40
def execute(self, raise_on_error: bool = True) -> List[Any]: ...
41
42
def multi(self) -> Pipeline: ...
43
44
def exec_(self) -> List[Any]: ...
45
46
def discard(self) -> None: ...
47
48
def watch(self, *names: KeyT) -> bool: ...
49
50
def unwatch(self) -> bool: ...
51
52
def reset(self) -> None: ...
53
54
def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> Pipeline: ...
55
```
56
57
### Transaction Helpers
58
59
Utility functions for transaction management and error handling.
60
61
```python { .api }
62
# Transaction execution with retry logic
63
def transaction(
64
func: Callable,
65
*watches: KeyT,
66
shard_hint: Optional[str] = None,
67
value_from_callable: bool = False,
68
watch_delay: Optional[float] = None
69
) -> Any: ...
70
```
71
72
## Usage Examples
73
74
### Basic Transactions
75
76
```python
77
import fakeredis
78
79
client = fakeredis.FakeRedis()
80
81
# Setup initial data
82
client.set('counter', '10')
83
client.set('balance', '100')
84
85
# Basic transaction - all commands executed atomically
86
client.multi()
87
client.incr('counter')
88
client.decr('balance', 10)
89
client.set('last_operation', 'purchase')
90
91
# Execute all commands atomically
92
results = client.execute()
93
print(f"Transaction results: {results}") # [11, 90, True]
94
95
# Verify final state
96
print(f"Counter: {client.get('counter').decode()}") # '11'
97
print(f"Balance: {client.get('balance').decode()}") # '90'
98
print(f"Last op: {client.get('last_operation').decode()}") # 'purchase'
99
```
100
101
### Transaction Rollback
102
103
```python
104
import fakeredis
105
106
client = fakeredis.FakeRedis()
107
108
# Setup initial data
109
client.set('account_a', '1000')
110
client.set('account_b', '500')
111
112
# Start transaction
113
client.multi()
114
client.decrby('account_a', 100) # Deduct from account A
115
client.incrby('account_b', 100) # Add to account B
116
client.set('transfer_log', 'transfer_123')
117
118
# Simulate decision to cancel transaction
119
client.discard() # Cancel all queued commands
120
121
# Verify no changes were made
122
print(f"Account A: {client.get('account_a').decode()}") # Still '1000'
123
print(f"Account B: {client.get('account_b').decode()}") # Still '500'
124
print(f"Transfer log: {client.get('transfer_log')}") # None
125
```
126
127
### Optimistic Locking with WATCH
128
129
```python
130
import fakeredis
131
import threading
132
import time
133
134
client = fakeredis.FakeRedis()
135
136
# Setup shared counter
137
client.set('shared_counter', '0')
138
139
def increment_counter_safely(worker_id):
140
"""Safely increment counter using optimistic locking"""
141
max_retries = 5
142
143
for attempt in range(max_retries):
144
try:
145
# Watch the counter for changes
146
client.watch('shared_counter')
147
148
# Get current value
149
current_value = int(client.get('shared_counter').decode())
150
151
# Simulate some processing time
152
time.sleep(0.01)
153
154
# Start transaction
155
client.multi()
156
client.set('shared_counter', str(current_value + 1))
157
client.set(f'worker_{worker_id}_last_update', str(int(time.time())))
158
159
# Execute transaction
160
result = client.execute()
161
162
if result is not None: # Transaction succeeded
163
print(f"Worker {worker_id}: Successfully incremented to {current_value + 1}")
164
break
165
else: # Transaction was aborted due to watched key change
166
print(f"Worker {worker_id}: Retry {attempt + 1} - counter was modified by another worker")
167
168
except Exception as e:
169
print(f"Worker {worker_id}: Error - {e}")
170
171
finally:
172
# Always unwatch to clean up
173
client.unwatch()
174
175
else:
176
print(f"Worker {worker_id}: Failed to increment after {max_retries} attempts")
177
178
# Test concurrent increments
179
print("Starting concurrent counter increment test...")
180
181
# Create multiple worker threads
182
workers = []
183
for i in range(5):
184
worker = threading.Thread(target=increment_counter_safely, args=(i,))
185
workers.append(worker)
186
187
# Start all workers
188
for worker in workers:
189
worker.start()
190
191
# Wait for all workers to complete
192
for worker in workers:
193
worker.join()
194
195
# Check final counter value
196
final_value = client.get('shared_counter').decode()
197
print(f"Final counter value: {final_value}")
198
199
# Check which workers succeeded
200
for i in range(5):
201
last_update = client.get(f'worker_{i}_last_update')
202
if last_update:
203
print(f"Worker {i} last updated at: {last_update.decode()}")
204
```
205
206
### Pipeline Transactions
207
208
```python
209
import fakeredis
210
import time
211
212
client = fakeredis.FakeRedis()
213
214
# Setup test data
215
for i in range(5):
216
client.set(f'item:{i}', f'value_{i}')
217
client.set(f'counter:{i}', str(i * 10))
218
219
# Using pipeline with transactions for better performance
220
with client.pipeline(transaction=True) as pipe:
221
# All commands are queued
222
pipe.multi()
223
224
# Batch operations
225
for i in range(5):
226
pipe.get(f'item:{i}')
227
pipe.incr(f'counter:{i}')
228
pipe.set(f'timestamp:{i}', str(int(time.time())))
229
230
# Execute all commands atomically
231
results = pipe.execute()
232
233
print(f"Pipeline transaction executed {len(results)} commands")
234
235
# Results are returned in order of execution
236
items = []
237
counters = []
238
timestamps = []
239
240
for i in range(0, len(results), 3): # Every 3 results (get, incr, set)
241
items.append(results[i].decode() if results[i] else None)
242
counters.append(results[i + 1])
243
timestamps.append(results[i + 2])
244
245
print("Items:", items)
246
print("Counters:", counters)
247
print("All operations completed atomically")
248
```
249
250
### Pipeline without Transactions
251
252
```python
253
import fakeredis
254
255
client = fakeredis.FakeRedis()
256
257
# Pipeline without transaction - better performance, no atomicity
258
with client.pipeline(transaction=False) as pipe:
259
# Queue multiple commands
260
pipe.set('key1', 'value1')
261
pipe.set('key2', 'value2')
262
pipe.get('key1')
263
pipe.get('key2')
264
pipe.incr('counter')
265
pipe.incr('counter')
266
267
# Execute all commands (not atomic, but faster)
268
results = pipe.execute()
269
270
print(f"Non-transactional pipeline results: {results}")
271
# [True, True, b'value1', b'value2', 1, 2]
272
```
273
274
### Complex Transaction with Conditional Logic
275
276
```python
277
import fakeredis
278
import json
279
280
client = fakeredis.FakeRedis()
281
282
# Setup e-commerce inventory
283
products = {
284
'product_1': {'price': 29.99, 'stock': 10},
285
'product_2': {'price': 49.99, 'stock': 5},
286
'product_3': {'price': 19.99, 'stock': 0}
287
}
288
289
for product_id, data in products.items():
290
client.hset(f'product:{product_id}', mapping={
291
'price': str(data['price']),
292
'stock': str(data['stock'])
293
})
294
295
client.set('user:123:balance', '150.00')
296
297
def process_order(user_id, orders):
298
"""Process an order with inventory and balance checks"""
299
300
# Watch all relevant keys
301
watch_keys = [f'user:{user_id}:balance']
302
for product_id, quantity in orders.items():
303
watch_keys.append(f'product:{product_id}')
304
305
client.watch(*watch_keys)
306
307
try:
308
# Check current balance
309
current_balance = float(client.get(f'user:{user_id}:balance').decode())
310
311
# Calculate total cost and check availability
312
total_cost = 0
313
order_details = []
314
315
for product_id, quantity in orders.items():
316
product_data = client.hgetall(f'product:{product_id}')
317
318
if not product_data:
319
print(f"Product {product_id} not found")
320
return False
321
322
price = float(product_data[b'price'].decode())
323
stock = int(product_data[b'stock'].decode())
324
325
if stock < quantity:
326
print(f"Insufficient stock for {product_id}: need {quantity}, have {stock}")
327
return False
328
329
item_cost = price * quantity
330
total_cost += item_cost
331
332
order_details.append({
333
'product_id': product_id,
334
'quantity': quantity,
335
'price': price,
336
'total': item_cost,
337
'remaining_stock': stock - quantity
338
})
339
340
# Check if user has sufficient balance
341
if current_balance < total_cost:
342
print(f"Insufficient balance: need ${total_cost:.2f}, have ${current_balance:.2f}")
343
return False
344
345
# All checks passed, execute transaction
346
client.multi()
347
348
# Deduct from user balance
349
new_balance = current_balance - total_cost
350
client.set(f'user:{user_id}:balance', f'{new_balance:.2f}')
351
352
# Update inventory
353
for order in order_details:
354
client.hset(
355
f'product:{order["product_id"]}',
356
'stock',
357
str(order['remaining_stock'])
358
)
359
360
# Create order record
361
order_id = f'order_{int(time.time() * 1000)}'
362
client.set(f'order:{order_id}', json.dumps({
363
'user_id': user_id,
364
'items': order_details,
365
'total_cost': total_cost,
366
'timestamp': int(time.time())
367
}))
368
369
# Execute transaction
370
result = client.execute()
371
372
if result is not None:
373
print(f"✅ Order {order_id} processed successfully!")
374
print(f" Total: ${total_cost:.2f}")
375
print(f" New balance: ${new_balance:.2f}")
376
return True
377
else:
378
print("❌ Transaction aborted - data was modified during processing")
379
return False
380
381
except Exception as e:
382
print(f"Error processing order: {e}")
383
return False
384
385
finally:
386
client.unwatch()
387
388
# Test order processing
389
print("=== Order Processing Test ===")
390
391
# Successful order
392
print("\n1. Processing valid order...")
393
success1 = process_order('123', {
394
'product_1': 2, # 2 × $29.99 = $59.98
395
'product_2': 1 # 1 × $49.99 = $49.99
396
}) # Total: $109.97
397
398
print(f"Order 1 result: {'Success' if success1 else 'Failed'}")
399
400
# Check remaining balance and inventory
401
print(f"Remaining balance: ${client.get('user:123:balance').decode()}")
402
print(f"Product 1 stock: {client.hget('product:product_1', 'stock').decode()}")
403
print(f"Product 2 stock: {client.hget('product:product_2', 'stock').decode()}")
404
405
# Order that exceeds balance
406
print("\n2. Processing order that exceeds balance...")
407
success2 = process_order('123', {
408
'product_2': 3 # 3 × $49.99 = $149.97 (exceeds remaining balance)
409
})
410
411
# Order with insufficient stock
412
print("\n3. Processing order with insufficient stock...")
413
success3 = process_order('123', {
414
'product_3': 1 # Out of stock
415
})
416
```
417
418
### Pattern: Atomic Counters
419
420
```python
421
import fakeredis
422
import threading
423
import time
424
from typing import Dict, Any
425
426
class AtomicCounters:
427
def __init__(self, client: fakeredis.FakeRedis):
428
self.client = client
429
430
def increment(self, counter_name: str, amount: int = 1) -> int:
431
"""Atomically increment a counter"""
432
return self.client.incrby(counter_name, amount)
433
434
def decrement(self, counter_name: str, amount: int = 1) -> int:
435
"""Atomically decrement a counter"""
436
return self.client.decrby(counter_name, amount)
437
438
def increment_multiple(self, counters: Dict[str, int]) -> Dict[str, int]:
439
"""Atomically increment multiple counters"""
440
with self.client.pipeline(transaction=True) as pipe:
441
pipe.multi()
442
443
for counter_name, amount in counters.items():
444
pipe.incrby(counter_name, amount)
445
446
results = pipe.execute()
447
448
# Return new values
449
return dict(zip(counters.keys(), results))
450
451
def conditional_increment(self, counter_name: str, condition_key: str, expected_value: str, amount: int = 1) -> bool:
452
"""Increment counter only if condition key has expected value"""
453
454
self.client.watch(condition_key, counter_name)
455
456
try:
457
# Check condition
458
current_value = self.client.get(condition_key)
459
if current_value is None or current_value.decode() != expected_value:
460
return False
461
462
# Condition met, increment counter
463
self.client.multi()
464
self.client.incrby(counter_name, amount)
465
result = self.client.execute()
466
467
return result is not None
468
469
finally:
470
self.client.unwatch()
471
472
def get_counters(self, *counter_names: str) -> Dict[str, int]:
473
"""Get current values of multiple counters"""
474
if not counter_names:
475
return {}
476
477
values = self.client.mget(counter_names)
478
return {
479
name: int(value.decode()) if value else 0
480
for name, value in zip(counter_names, values)
481
}
482
483
def reset_counter(self, counter_name: str) -> bool:
484
"""Reset counter to 0"""
485
return self.client.set(counter_name, '0')
486
487
# Usage example
488
client = fakeredis.FakeRedis()
489
counters = AtomicCounters(client)
490
491
# Initialize counters
492
counters.reset_counter('page_views')
493
counters.reset_counter('user_signups')
494
counters.reset_counter('orders_processed')
495
496
def simulate_web_traffic(worker_id: int, duration: int):
497
"""Simulate web traffic that updates various counters"""
498
start_time = time.time()
499
500
while time.time() - start_time < duration:
501
# Simulate page views
502
counters.increment('page_views')
503
504
# Occasionally simulate user signup
505
if time.time() % 3 < 0.1: # Roughly every 3 seconds
506
counters.increment('user_signups')
507
508
# Simulate order processing
509
if time.time() % 5 < 0.1: # Roughly every 5 seconds
510
counters.increment('orders_processed')
511
512
time.sleep(0.1) # 100ms between actions
513
514
print(f"Worker {worker_id} finished")
515
516
print("=== Atomic Counter Test ===")
517
518
# Start multiple workers to simulate concurrent traffic
519
workers = []
520
for i in range(3):
521
worker = threading.Thread(target=simulate_web_traffic, args=(i, 2)) # 2 seconds each
522
workers.append(worker)
523
worker.start()
524
525
# Wait for workers to complete
526
for worker in workers:
527
worker.join()
528
529
# Get final counter values
530
final_counts = counters.get_counters('page_views', 'user_signups', 'orders_processed')
531
print(f"Final counts: {final_counts}")
532
533
# Test bulk increment
534
print("\n=== Bulk Counter Update ===")
535
bulk_updates = {
536
'daily_logins': 100,
537
'api_calls': 500,
538
'database_queries': 250
539
}
540
541
results = counters.increment_multiple(bulk_updates)
542
print(f"Bulk update results: {results}")
543
544
# Test conditional increment
545
print("\n=== Conditional Counter Test ===")
546
client.set('feature_flag', 'enabled')
547
548
# This should succeed
549
success1 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)
550
print(f"Conditional increment (enabled): {success1}")
551
552
# Change flag and try again
553
client.set('feature_flag', 'disabled')
554
success2 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)
555
print(f"Conditional increment (disabled): {success2}")
556
557
final_feature_usage = counters.get_counters('feature_usage')
558
print(f"Feature usage count: {final_feature_usage}")
559
```
560
561
### Pattern: Distributed Lock
562
563
```python
564
import fakeredis
565
import time
566
import threading
567
import uuid
568
from typing import Optional
569
570
class DistributedLock:
571
def __init__(self, client: fakeredis.FakeRedis, key: str, timeout: int = 10):
572
self.client = client
573
self.key = f"lock:{key}"
574
self.timeout = timeout
575
self.identifier = str(uuid.uuid4())
576
self.acquired = False
577
578
def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool:
579
"""Acquire the distributed lock"""
580
end_time = time.time() + (timeout or self.timeout)
581
582
while True:
583
# Try to acquire lock using SET with NX and EX
584
acquired = self.client.set(
585
self.key,
586
self.identifier,
587
nx=True, # Only set if key doesn't exist
588
ex=self.timeout # Set expiration
589
)
590
591
if acquired:
592
self.acquired = True
593
return True
594
595
if not blocking or time.time() >= end_time:
596
return False
597
598
time.sleep(0.01) # Brief pause before retry
599
600
def release(self) -> bool:
601
"""Release the distributed lock"""
602
if not self.acquired:
603
return False
604
605
# Use Lua script to atomically check and delete
606
lua_script = """
607
if redis.call("get", KEYS[1]) == ARGV[1] then
608
return redis.call("del", KEYS[1])
609
else
610
return 0
611
end
612
"""
613
614
result = self.client.eval(lua_script, 1, self.key, self.identifier)
615
if result:
616
self.acquired = False
617
return True
618
return False
619
620
def extend(self, additional_time: int) -> bool:
621
"""Extend lock timeout"""
622
if not self.acquired:
623
return False
624
625
lua_script = """
626
if redis.call("get", KEYS[1]) == ARGV[1] then
627
return redis.call("expire", KEYS[1], ARGV[2])
628
else
629
return 0
630
end
631
"""
632
633
result = self.client.eval(lua_script, 1, self.key, self.identifier, additional_time)
634
return bool(result)
635
636
def __enter__(self):
637
if not self.acquire():
638
raise Exception(f"Could not acquire lock: {self.key}")
639
return self
640
641
def __exit__(self, exc_type, exc_val, exc_tb):
642
self.release()
643
644
def critical_section_work(worker_id: int, shared_resource: str, client: fakeredis.FakeRedis):
645
"""Simulate work that requires exclusive access to a shared resource"""
646
647
# Try to acquire lock for the shared resource
648
lock = DistributedLock(client, shared_resource, timeout=5)
649
650
try:
651
print(f"Worker {worker_id}: Attempting to acquire lock for {shared_resource}")
652
653
if lock.acquire(blocking=True, timeout=3.0):
654
print(f"Worker {worker_id}: ✅ Acquired lock for {shared_resource}")
655
656
# Get current value
657
current_value = client.get(f"resource:{shared_resource}")
658
current_count = int(current_value.decode()) if current_value else 0
659
660
# Simulate some processing time
661
time.sleep(0.5)
662
663
# Update the resource
664
new_count = current_count + 1
665
client.set(f"resource:{shared_resource}", str(new_count))
666
667
print(f"Worker {worker_id}: Updated {shared_resource} from {current_count} to {new_count}")
668
669
else:
670
print(f"Worker {worker_id}: ❌ Could not acquire lock for {shared_resource}")
671
672
finally:
673
if lock.acquired:
674
lock.release()
675
print(f"Worker {worker_id}: Released lock for {shared_resource}")
676
677
# Test distributed locking
678
client = fakeredis.FakeRedis()
679
680
# Initialize shared resource
681
client.set("resource:shared_counter", "0")
682
683
print("=== Distributed Lock Test ===")
684
685
# Create workers that compete for the same resource
686
workers = []
687
for i in range(5):
688
worker = threading.Thread(
689
target=critical_section_work,
690
args=(i, "shared_counter", client)
691
)
692
workers.append(worker)
693
694
# Start all workers simultaneously
695
for worker in workers:
696
worker.start()
697
698
# Wait for all workers
699
for worker in workers:
700
worker.join()
701
702
# Check final value
703
final_value = client.get("resource:shared_counter").decode()
704
print(f"\nFinal shared counter value: {final_value}")
705
706
# Test lock context manager
707
print("\n=== Lock Context Manager Test ===")
708
709
def update_with_context_manager(resource_name: str):
710
try:
711
with DistributedLock(client, resource_name) as lock:
712
print(f"Inside critical section for {resource_name}")
713
714
# Get and increment counter
715
current = client.get(f"resource:{resource_name}")
716
count = int(current.decode()) if current else 0
717
client.set(f"resource:{resource_name}", str(count + 10))
718
719
print(f"Updated {resource_name} to {count + 10}")
720
721
except Exception as e:
722
print(f"Failed to acquire lock: {e}")
723
724
# Initialize new resource
725
client.set("resource:context_test", "0")
726
727
# Test context manager
728
update_with_context_manager("context_test")
729
730
final_context_value = client.get("resource:context_test").decode()
731
print(f"Final context test value: {final_context_value}")
732
```