or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bitmap-operations.mdcore-clients.mdgeneric-operations.mdgeospatial-operations.mdhash-operations.mdindex.mdlist-operations.mdlua-scripting.mdpubsub-operations.mdserver-management.mdserver-operations.mdset-operations.mdsorted-set-operations.mdstack-extensions.mdstream-operations.mdstring-operations.mdtransaction-operations.mdvalkey-support.md

transaction-operations.mddocs/

0

# Transaction Operations

1

2

Redis transaction support with MULTI/EXEC commands and optimistic locking via WATCH. Transactions provide atomic execution of command groups with conditional execution based on watched key modifications, enabling safe concurrent operations and complex multi-step procedures.

3

4

## Capabilities

5

6

### Transaction Control

7

8

Core transaction commands for grouping operations and atomic execution.

9

10

```python { .api }

11

def multi(self) -> None: ...

12

13

def exec_(self) -> List[Any]: ...

14

15

def discard(self) -> None: ...

16

```

17

18

### Optimistic Locking

19

20

Watch mechanisms for conditional transaction execution based on key modifications.

21

22

```python { .api }

23

def watch(self, *names: KeyT) -> bool: ...

24

25

def unwatch(self) -> bool: ...

26

```

27

28

### Pipeline Operations

29

30

Pipelined command execution for performance optimization with optional transaction semantics.

31

32

```python { .api }

33

class Pipeline:

34

def __init__(self, connection_pool, response_callbacks, transaction: bool, shard_hint: Optional[str]): ...

35

36

def __enter__(self) -> Pipeline: ...

37

38

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

39

40

def execute(self, raise_on_error: bool = True) -> List[Any]: ...

41

42

def multi(self) -> Pipeline: ...

43

44

def exec_(self) -> List[Any]: ...

45

46

def discard(self) -> None: ...

47

48

def watch(self, *names: KeyT) -> bool: ...

49

50

def unwatch(self) -> bool: ...

51

52

def reset(self) -> None: ...

53

54

def pipeline(self, transaction: bool = True, shard_hint: Optional[str] = None) -> Pipeline: ...

55

```

56

57

### Transaction Helpers

58

59

Utility functions for transaction management and error handling.

60

61

```python { .api }

62

# Transaction execution with retry logic

63

def transaction(

64

func: Callable,

65

*watches: KeyT,

66

shard_hint: Optional[str] = None,

67

value_from_callable: bool = False,

68

watch_delay: Optional[float] = None

69

) -> Any: ...

70

```

71

72

## Usage Examples

73

74

### Basic Transactions

75

76

```python

77

import fakeredis

78

79

client = fakeredis.FakeRedis()

80

81

# Setup initial data

82

client.set('counter', '10')

83

client.set('balance', '100')

84

85

# Basic transaction - all commands executed atomically

86

client.multi()

87

client.incr('counter')

88

client.decr('balance', 10)

89

client.set('last_operation', 'purchase')

90

91

# Execute all commands atomically

92

results = client.execute()

93

print(f"Transaction results: {results}") # [11, 90, True]

94

95

# Verify final state

96

print(f"Counter: {client.get('counter').decode()}") # '11'

97

print(f"Balance: {client.get('balance').decode()}") # '90'

98

print(f"Last op: {client.get('last_operation').decode()}") # 'purchase'

99

```

100

101

### Transaction Rollback

102

103

```python

104

import fakeredis

105

106

client = fakeredis.FakeRedis()

107

108

# Setup initial data

109

client.set('account_a', '1000')

110

client.set('account_b', '500')

111

112

# Start transaction

113

client.multi()

114

client.decrby('account_a', 100) # Deduct from account A

115

client.incrby('account_b', 100) # Add to account B

116

client.set('transfer_log', 'transfer_123')

117

118

# Simulate decision to cancel transaction

119

client.discard() # Cancel all queued commands

120

121

# Verify no changes were made

122

print(f"Account A: {client.get('account_a').decode()}") # Still '1000'

123

print(f"Account B: {client.get('account_b').decode()}") # Still '500'

124

print(f"Transfer log: {client.get('transfer_log')}") # None

125

```

126

127

### Optimistic Locking with WATCH

128

129

```python

130

import fakeredis

131

import threading

132

import time

133

134

client = fakeredis.FakeRedis()

135

136

# Setup shared counter

137

client.set('shared_counter', '0')

138

139

def increment_counter_safely(worker_id):

140

"""Safely increment counter using optimistic locking"""

141

max_retries = 5

142

143

for attempt in range(max_retries):

144

try:

145

# Watch the counter for changes

146

client.watch('shared_counter')

147

148

# Get current value

149

current_value = int(client.get('shared_counter').decode())

150

151

# Simulate some processing time

152

time.sleep(0.01)

153

154

# Start transaction

155

client.multi()

156

client.set('shared_counter', str(current_value + 1))

157

client.set(f'worker_{worker_id}_last_update', str(int(time.time())))

158

159

# Execute transaction

160

result = client.execute()

161

162

if result is not None: # Transaction succeeded

163

print(f"Worker {worker_id}: Successfully incremented to {current_value + 1}")

164

break

165

else: # Transaction was aborted due to watched key change

166

print(f"Worker {worker_id}: Retry {attempt + 1} - counter was modified by another worker")

167

168

except Exception as e:

169

print(f"Worker {worker_id}: Error - {e}")

170

171

finally:

172

# Always unwatch to clean up

173

client.unwatch()

174

175

else:

176

print(f"Worker {worker_id}: Failed to increment after {max_retries} attempts")

177

178

# Test concurrent increments

179

print("Starting concurrent counter increment test...")

180

181

# Create multiple worker threads

182

workers = []

183

for i in range(5):

184

worker = threading.Thread(target=increment_counter_safely, args=(i,))

185

workers.append(worker)

186

187

# Start all workers

188

for worker in workers:

189

worker.start()

190

191

# Wait for all workers to complete

192

for worker in workers:

193

worker.join()

194

195

# Check final counter value

196

final_value = client.get('shared_counter').decode()

197

print(f"Final counter value: {final_value}")

198

199

# Check which workers succeeded

200

for i in range(5):

201

last_update = client.get(f'worker_{i}_last_update')

202

if last_update:

203

print(f"Worker {i} last updated at: {last_update.decode()}")

204

```

205

206

### Pipeline Transactions

207

208

```python

209

import fakeredis

210

import time

211

212

client = fakeredis.FakeRedis()

213

214

# Setup test data

215

for i in range(5):

216

client.set(f'item:{i}', f'value_{i}')

217

client.set(f'counter:{i}', str(i * 10))

218

219

# Using pipeline with transactions for better performance

220

with client.pipeline(transaction=True) as pipe:

221

# All commands are queued

222

pipe.multi()

223

224

# Batch operations

225

for i in range(5):

226

pipe.get(f'item:{i}')

227

pipe.incr(f'counter:{i}')

228

pipe.set(f'timestamp:{i}', str(int(time.time())))

229

230

# Execute all commands atomically

231

results = pipe.execute()

232

233

print(f"Pipeline transaction executed {len(results)} commands")

234

235

# Results are returned in order of execution

236

items = []

237

counters = []

238

timestamps = []

239

240

for i in range(0, len(results), 3): # Every 3 results (get, incr, set)

241

items.append(results[i].decode() if results[i] else None)

242

counters.append(results[i + 1])

243

timestamps.append(results[i + 2])

244

245

print("Items:", items)

246

print("Counters:", counters)

247

print("All operations completed atomically")

248

```

249

250

### Pipeline without Transactions

251

252

```python

253

import fakeredis

254

255

client = fakeredis.FakeRedis()

256

257

# Pipeline without transaction - better performance, no atomicity

258

with client.pipeline(transaction=False) as pipe:

259

# Queue multiple commands

260

pipe.set('key1', 'value1')

261

pipe.set('key2', 'value2')

262

pipe.get('key1')

263

pipe.get('key2')

264

pipe.incr('counter')

265

pipe.incr('counter')

266

267

# Execute all commands (not atomic, but faster)

268

results = pipe.execute()

269

270

print(f"Non-transactional pipeline results: {results}")

271

# [True, True, b'value1', b'value2', 1, 2]

272

```

273

274

### Complex Transaction with Conditional Logic

275

276

```python

277

import fakeredis

278

import json

279

280

client = fakeredis.FakeRedis()

281

282

# Setup e-commerce inventory

283

products = {

284

'product_1': {'price': 29.99, 'stock': 10},

285

'product_2': {'price': 49.99, 'stock': 5},

286

'product_3': {'price': 19.99, 'stock': 0}

287

}

288

289

for product_id, data in products.items():

290

client.hset(f'product:{product_id}', mapping={

291

'price': str(data['price']),

292

'stock': str(data['stock'])

293

})

294

295

client.set('user:123:balance', '150.00')

296

297

def process_order(user_id, orders):

298

"""Process an order with inventory and balance checks"""

299

300

# Watch all relevant keys

301

watch_keys = [f'user:{user_id}:balance']

302

for product_id, quantity in orders.items():

303

watch_keys.append(f'product:{product_id}')

304

305

client.watch(*watch_keys)

306

307

try:

308

# Check current balance

309

current_balance = float(client.get(f'user:{user_id}:balance').decode())

310

311

# Calculate total cost and check availability

312

total_cost = 0

313

order_details = []

314

315

for product_id, quantity in orders.items():

316

product_data = client.hgetall(f'product:{product_id}')

317

318

if not product_data:

319

print(f"Product {product_id} not found")

320

return False

321

322

price = float(product_data[b'price'].decode())

323

stock = int(product_data[b'stock'].decode())

324

325

if stock < quantity:

326

print(f"Insufficient stock for {product_id}: need {quantity}, have {stock}")

327

return False

328

329

item_cost = price * quantity

330

total_cost += item_cost

331

332

order_details.append({

333

'product_id': product_id,

334

'quantity': quantity,

335

'price': price,

336

'total': item_cost,

337

'remaining_stock': stock - quantity

338

})

339

340

# Check if user has sufficient balance

341

if current_balance < total_cost:

342

print(f"Insufficient balance: need ${total_cost:.2f}, have ${current_balance:.2f}")

343

return False

344

345

# All checks passed, execute transaction

346

client.multi()

347

348

# Deduct from user balance

349

new_balance = current_balance - total_cost

350

client.set(f'user:{user_id}:balance', f'{new_balance:.2f}')

351

352

# Update inventory

353

for order in order_details:

354

client.hset(

355

f'product:{order["product_id"]}',

356

'stock',

357

str(order['remaining_stock'])

358

)

359

360

# Create order record

361

order_id = f'order_{int(time.time() * 1000)}'

362

client.set(f'order:{order_id}', json.dumps({

363

'user_id': user_id,

364

'items': order_details,

365

'total_cost': total_cost,

366

'timestamp': int(time.time())

367

}))

368

369

# Execute transaction

370

result = client.execute()

371

372

if result is not None:

373

print(f"✅ Order {order_id} processed successfully!")

374

print(f" Total: ${total_cost:.2f}")

375

print(f" New balance: ${new_balance:.2f}")

376

return True

377

else:

378

print("❌ Transaction aborted - data was modified during processing")

379

return False

380

381

except Exception as e:

382

print(f"Error processing order: {e}")

383

return False

384

385

finally:

386

client.unwatch()

387

388

# Test order processing

389

print("=== Order Processing Test ===")

390

391

# Successful order

392

print("\n1. Processing valid order...")

393

success1 = process_order('123', {

394

'product_1': 2, # 2 × $29.99 = $59.98

395

'product_2': 1 # 1 × $49.99 = $49.99

396

}) # Total: $109.97

397

398

print(f"Order 1 result: {'Success' if success1 else 'Failed'}")

399

400

# Check remaining balance and inventory

401

print(f"Remaining balance: ${client.get('user:123:balance').decode()}")

402

print(f"Product 1 stock: {client.hget('product:product_1', 'stock').decode()}")

403

print(f"Product 2 stock: {client.hget('product:product_2', 'stock').decode()}")

404

405

# Order that exceeds balance

406

print("\n2. Processing order that exceeds balance...")

407

success2 = process_order('123', {

408

'product_2': 3 # 3 × $49.99 = $149.97 (exceeds remaining balance)

409

})

410

411

# Order with insufficient stock

412

print("\n3. Processing order with insufficient stock...")

413

success3 = process_order('123', {

414

'product_3': 1 # Out of stock

415

})

416

```

417

418

### Pattern: Atomic Counters

419

420

```python

421

import fakeredis

422

import threading

423

import time

424

from typing import Dict, Any

425

426

class AtomicCounters:

427

def __init__(self, client: fakeredis.FakeRedis):

428

self.client = client

429

430

def increment(self, counter_name: str, amount: int = 1) -> int:

431

"""Atomically increment a counter"""

432

return self.client.incrby(counter_name, amount)

433

434

def decrement(self, counter_name: str, amount: int = 1) -> int:

435

"""Atomically decrement a counter"""

436

return self.client.decrby(counter_name, amount)

437

438

def increment_multiple(self, counters: Dict[str, int]) -> Dict[str, int]:

439

"""Atomically increment multiple counters"""

440

with self.client.pipeline(transaction=True) as pipe:

441

pipe.multi()

442

443

for counter_name, amount in counters.items():

444

pipe.incrby(counter_name, amount)

445

446

results = pipe.execute()

447

448

# Return new values

449

return dict(zip(counters.keys(), results))

450

451

def conditional_increment(self, counter_name: str, condition_key: str, expected_value: str, amount: int = 1) -> bool:

452

"""Increment counter only if condition key has expected value"""

453

454

self.client.watch(condition_key, counter_name)

455

456

try:

457

# Check condition

458

current_value = self.client.get(condition_key)

459

if current_value is None or current_value.decode() != expected_value:

460

return False

461

462

# Condition met, increment counter

463

self.client.multi()

464

self.client.incrby(counter_name, amount)

465

result = self.client.execute()

466

467

return result is not None

468

469

finally:

470

self.client.unwatch()

471

472

def get_counters(self, *counter_names: str) -> Dict[str, int]:

473

"""Get current values of multiple counters"""

474

if not counter_names:

475

return {}

476

477

values = self.client.mget(counter_names)

478

return {

479

name: int(value.decode()) if value else 0

480

for name, value in zip(counter_names, values)

481

}

482

483

def reset_counter(self, counter_name: str) -> bool:

484

"""Reset counter to 0"""

485

return self.client.set(counter_name, '0')

486

487

# Usage example

488

client = fakeredis.FakeRedis()

489

counters = AtomicCounters(client)

490

491

# Initialize counters

492

counters.reset_counter('page_views')

493

counters.reset_counter('user_signups')

494

counters.reset_counter('orders_processed')

495

496

def simulate_web_traffic(worker_id: int, duration: int):

497

"""Simulate web traffic that updates various counters"""

498

start_time = time.time()

499

500

while time.time() - start_time < duration:

501

# Simulate page views

502

counters.increment('page_views')

503

504

# Occasionally simulate user signup

505

if time.time() % 3 < 0.1: # Roughly every 3 seconds

506

counters.increment('user_signups')

507

508

# Simulate order processing

509

if time.time() % 5 < 0.1: # Roughly every 5 seconds

510

counters.increment('orders_processed')

511

512

time.sleep(0.1) # 100ms between actions

513

514

print(f"Worker {worker_id} finished")

515

516

print("=== Atomic Counter Test ===")

517

518

# Start multiple workers to simulate concurrent traffic

519

workers = []

520

for i in range(3):

521

worker = threading.Thread(target=simulate_web_traffic, args=(i, 2)) # 2 seconds each

522

workers.append(worker)

523

worker.start()

524

525

# Wait for workers to complete

526

for worker in workers:

527

worker.join()

528

529

# Get final counter values

530

final_counts = counters.get_counters('page_views', 'user_signups', 'orders_processed')

531

print(f"Final counts: {final_counts}")

532

533

# Test bulk increment

534

print("\n=== Bulk Counter Update ===")

535

bulk_updates = {

536

'daily_logins': 100,

537

'api_calls': 500,

538

'database_queries': 250

539

}

540

541

results = counters.increment_multiple(bulk_updates)

542

print(f"Bulk update results: {results}")

543

544

# Test conditional increment

545

print("\n=== Conditional Counter Test ===")

546

client.set('feature_flag', 'enabled')

547

548

# This should succeed

549

success1 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)

550

print(f"Conditional increment (enabled): {success1}")

551

552

# Change flag and try again

553

client.set('feature_flag', 'disabled')

554

success2 = counters.conditional_increment('feature_usage', 'feature_flag', 'enabled', 1)

555

print(f"Conditional increment (disabled): {success2}")

556

557

final_feature_usage = counters.get_counters('feature_usage')

558

print(f"Feature usage count: {final_feature_usage}")

559

```

560

561

### Pattern: Distributed Lock

562

563

```python

564

import fakeredis

565

import time

566

import threading

567

import uuid

568

from typing import Optional

569

570

class DistributedLock:

571

def __init__(self, client: fakeredis.FakeRedis, key: str, timeout: int = 10):

572

self.client = client

573

self.key = f"lock:{key}"

574

self.timeout = timeout

575

self.identifier = str(uuid.uuid4())

576

self.acquired = False

577

578

def acquire(self, blocking: bool = True, timeout: Optional[float] = None) -> bool:

579

"""Acquire the distributed lock"""

580

end_time = time.time() + (timeout or self.timeout)

581

582

while True:

583

# Try to acquire lock using SET with NX and EX

584

acquired = self.client.set(

585

self.key,

586

self.identifier,

587

nx=True, # Only set if key doesn't exist

588

ex=self.timeout # Set expiration

589

)

590

591

if acquired:

592

self.acquired = True

593

return True

594

595

if not blocking or time.time() >= end_time:

596

return False

597

598

time.sleep(0.01) # Brief pause before retry

599

600

def release(self) -> bool:

601

"""Release the distributed lock"""

602

if not self.acquired:

603

return False

604

605

# Use Lua script to atomically check and delete

606

lua_script = """

607

if redis.call("get", KEYS[1]) == ARGV[1] then

608

return redis.call("del", KEYS[1])

609

else

610

return 0

611

end

612

"""

613

614

result = self.client.eval(lua_script, 1, self.key, self.identifier)

615

if result:

616

self.acquired = False

617

return True

618

return False

619

620

def extend(self, additional_time: int) -> bool:

621

"""Extend lock timeout"""

622

if not self.acquired:

623

return False

624

625

lua_script = """

626

if redis.call("get", KEYS[1]) == ARGV[1] then

627

return redis.call("expire", KEYS[1], ARGV[2])

628

else

629

return 0

630

end

631

"""

632

633

result = self.client.eval(lua_script, 1, self.key, self.identifier, additional_time)

634

return bool(result)

635

636

def __enter__(self):

637

if not self.acquire():

638

raise Exception(f"Could not acquire lock: {self.key}")

639

return self

640

641

def __exit__(self, exc_type, exc_val, exc_tb):

642

self.release()

643

644

def critical_section_work(worker_id: int, shared_resource: str, client: fakeredis.FakeRedis):

645

"""Simulate work that requires exclusive access to a shared resource"""

646

647

# Try to acquire lock for the shared resource

648

lock = DistributedLock(client, shared_resource, timeout=5)

649

650

try:

651

print(f"Worker {worker_id}: Attempting to acquire lock for {shared_resource}")

652

653

if lock.acquire(blocking=True, timeout=3.0):

654

print(f"Worker {worker_id}: ✅ Acquired lock for {shared_resource}")

655

656

# Get current value

657

current_value = client.get(f"resource:{shared_resource}")

658

current_count = int(current_value.decode()) if current_value else 0

659

660

# Simulate some processing time

661

time.sleep(0.5)

662

663

# Update the resource

664

new_count = current_count + 1

665

client.set(f"resource:{shared_resource}", str(new_count))

666

667

print(f"Worker {worker_id}: Updated {shared_resource} from {current_count} to {new_count}")

668

669

else:

670

print(f"Worker {worker_id}: ❌ Could not acquire lock for {shared_resource}")

671

672

finally:

673

if lock.acquired:

674

lock.release()

675

print(f"Worker {worker_id}: Released lock for {shared_resource}")

676

677

# Test distributed locking

678

client = fakeredis.FakeRedis()

679

680

# Initialize shared resource

681

client.set("resource:shared_counter", "0")

682

683

print("=== Distributed Lock Test ===")

684

685

# Create workers that compete for the same resource

686

workers = []

687

for i in range(5):

688

worker = threading.Thread(

689

target=critical_section_work,

690

args=(i, "shared_counter", client)

691

)

692

workers.append(worker)

693

694

# Start all workers simultaneously

695

for worker in workers:

696

worker.start()

697

698

# Wait for all workers

699

for worker in workers:

700

worker.join()

701

702

# Check final value

703

final_value = client.get("resource:shared_counter").decode()

704

print(f"\nFinal shared counter value: {final_value}")

705

706

# Test lock context manager

707

print("\n=== Lock Context Manager Test ===")

708

709

def update_with_context_manager(resource_name: str):

710

try:

711

with DistributedLock(client, resource_name) as lock:

712

print(f"Inside critical section for {resource_name}")

713

714

# Get and increment counter

715

current = client.get(f"resource:{resource_name}")

716

count = int(current.decode()) if current else 0

717

client.set(f"resource:{resource_name}", str(count + 10))

718

719

print(f"Updated {resource_name} to {count + 10}")

720

721

except Exception as e:

722

print(f"Failed to acquire lock: {e}")

723

724

# Initialize new resource

725

client.set("resource:context_test", "0")

726

727

# Test context manager

728

update_with_context_manager("context_test")

729

730

final_context_value = client.get("resource:context_test").decode()

731

print(f"Final context test value: {final_context_value}")

732

```