or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdclient-side-caching.mdclustering.mdcommands-operations.mdconnection-management.mdcore-clients.mdexceptions.mdindex.mdmodules.mdparameters.mdpubsub.mdtransactions-pipelining.md

transactions-pipelining.mddocs/

0

# Transactions and Pipelining

1

2

This document covers Redis transactions and command pipelining in Jedis, including MULTI/EXEC transactions, optimistic locking with WATCH, command batching for performance optimization, and reliable transaction patterns.

3

4

## Redis Transactions

5

6

### Transaction

7

8

Redis MULTI/EXEC transaction implementation providing ACID properties.

9

10

```java { .api }

11

public class Transaction implements AutoCloseable {

12

/**

13

* Queue SET command in transaction

14

* @param key Redis key

15

* @param value String value

16

* @return Response object for deferred result

17

*/

18

public Response<String> set(String key, String value);

19

20

/**

21

* Queue SET command with parameters

22

* @param key Redis key

23

* @param value String value

24

* @param params SET parameters (EX, PX, NX, XX)

25

* @return Response object for deferred result

26

*/

27

public Response<String> set(String key, String value, SetParams params);

28

29

/**

30

* Queue GET command in transaction

31

* @param key Redis key

32

* @return Response object for deferred result

33

*/

34

public Response<String> get(String key);

35

36

/**

37

* Queue DELETE command in transaction

38

* @param keys Redis keys to delete

39

* @return Response object for deferred result

40

*/

41

public Response<Long> del(String... keys);

42

43

/**

44

* Queue INCREMENT command in transaction

45

* @param key Redis key

46

* @return Response object for deferred result

47

*/

48

public Response<Long> incr(String key);

49

50

/**

51

* Queue HASH SET command in transaction

52

* @param key Hash key

53

* @param field Hash field

54

* @param value Field value

55

* @return Response object for deferred result

56

*/

57

public Response<Long> hset(String key, String field, String value);

58

59

/**

60

* Queue HASH GET command in transaction

61

* @param key Hash key

62

* @param field Hash field

63

* @return Response object for deferred result

64

*/

65

public Response<String> hget(String key, String field);

66

67

/**

68

* Queue LIST PUSH command in transaction

69

* @param key List key

70

* @param strings Elements to push

71

* @return Response object for deferred result

72

*/

73

public Response<Long> lpush(String key, String... strings);

74

75

/**

76

* Queue LIST POP command in transaction

77

* @param key List key

78

* @return Response object for deferred result

79

*/

80

public Response<String> lpop(String key);

81

82

/**

83

* Queue SORTED SET ADD command in transaction

84

* @param key Sorted set key

85

* @param score Element score

86

* @param member Element member

87

* @return Response object for deferred result

88

*/

89

public Response<Long> zadd(String key, double score, String member);

90

91

/**

92

* Execute all queued commands atomically

93

* @return List of command results in order, or null if transaction was discarded

94

*/

95

public List<Object> exec();

96

97

/**

98

* Discard all queued commands and exit transaction mode

99

* @return Status code reply

100

*/

101

public String discard();

102

103

/**

104

* Get number of queued commands

105

* @return Number of commands in transaction queue

106

*/

107

public int getQueuedCommandCount();

108

109

/**

110

* Check if transaction has been executed or discarded

111

* @return true if transaction is finished

112

*/

113

public boolean isInMulti();

114

115

/**

116

* Close transaction (calls discard if not executed)

117

*/

118

public void close();

119

}

120

```

121

122

### Basic Transaction Usage

123

124

```java

125

Jedis jedis = new Jedis("localhost", 6379);

126

127

try (Transaction transaction = jedis.multi()) {

128

// Queue commands in transaction

129

Response<String> setResult = transaction.set("user:1:name", "John");

130

Response<Long> incrResult = transaction.incr("user:1:visits");

131

Response<String> getResult = transaction.get("user:1:name");

132

Response<Long> hsetResult = transaction.hset("user:1:profile", "age", "30");

133

134

// Execute all commands atomically

135

List<Object> results = transaction.exec();

136

137

if (results != null) {

138

// Transaction succeeded

139

System.out.println("SET result: " + setResult.get()); // "OK"

140

System.out.println("INCR result: " + incrResult.get()); // 1

141

System.out.println("GET result: " + getResult.get()); // "John"

142

System.out.println("HSET result: " + hsetResult.get()); // 1

143

} else {

144

// Transaction was discarded (due to WATCH)

145

System.out.println("Transaction was discarded");

146

}

147

}

148

149

jedis.close();

150

```

151

152

### Optimistic Locking with WATCH

153

154

Monitor keys for changes and abort transaction if watched keys are modified.

155

156

```java { .api }

157

public interface JedisCommands {

158

/**

159

* Watch keys for changes during transaction

160

* @param keys Keys to watch

161

* @return Status code reply

162

*/

163

String watch(String... keys);

164

165

/**

166

* Unwatch all previously watched keys

167

* @return Status code reply

168

*/

169

String unwatch();

170

171

/**

172

* Begin transaction (MULTI command)

173

* @return Transaction object

174

*/

175

Transaction multi();

176

}

177

```

178

179

#### Optimistic Locking Example

180

181

```java

182

public boolean transferFunds(String fromAccount, String toAccount, double amount) {

183

Jedis jedis = new Jedis("localhost", 6379);

184

185

try {

186

String fromBalanceKey = "account:" + fromAccount + ":balance";

187

String toBalanceKey = "account:" + toAccount + ":balance";

188

189

// Watch keys that will be modified

190

jedis.watch(fromBalanceKey, toBalanceKey);

191

192

// Get current balances

193

String fromBalanceStr = jedis.get(fromBalanceKey);

194

String toBalanceStr = jedis.get(toBalanceKey);

195

196

if (fromBalanceStr == null || toBalanceStr == null) {

197

jedis.unwatch();

198

return false; // Account doesn't exist

199

}

200

201

double fromBalance = Double.parseDouble(fromBalanceStr);

202

double toBalance = Double.parseDouble(toBalanceStr);

203

204

if (fromBalance < amount) {

205

jedis.unwatch();

206

return false; // Insufficient funds

207

}

208

209

// Start transaction

210

try (Transaction transaction = jedis.multi()) {

211

// Update balances

212

transaction.set(fromBalanceKey, String.valueOf(fromBalance - amount));

213

transaction.set(toBalanceKey, String.valueOf(toBalance + amount));

214

215

// Log transaction

216

transaction.lpush("transactions",

217

String.format("Transfer: %s -> %s, Amount: %.2f",

218

fromAccount, toAccount, amount));

219

220

List<Object> results = transaction.exec();

221

222

if (results != null) {

223

System.out.println("Transfer completed successfully");

224

return true;

225

} else {

226

System.out.println("Transfer failed - accounts were modified by another client");

227

return false;

228

}

229

}

230

} finally {

231

jedis.close();

232

}

233

}

234

235

// Usage with retry logic

236

public boolean transferWithRetry(String from, String to, double amount, int maxRetries) {

237

for (int i = 0; i < maxRetries; i++) {

238

if (transferFunds(from, to, amount)) {

239

return true;

240

}

241

242

// Brief delay before retry

243

try {

244

Thread.sleep(10 + (int)(Math.random() * 20)); // 10-30ms random delay

245

} catch (InterruptedException e) {

246

Thread.currentThread().interrupt();

247

break;

248

}

249

}

250

return false;

251

}

252

```

253

254

### ReliableTransaction

255

256

Enhanced transaction with built-in retry logic and error handling.

257

258

```java { .api }

259

public class ReliableTransaction extends Transaction {

260

/**

261

* Creates reliable transaction with retry capability

262

* @param jedis Jedis connection

263

* @param maxRetries Maximum retry attempts

264

* @param retryDelayMs Delay between retries in milliseconds

265

*/

266

public ReliableTransaction(Jedis jedis, int maxRetries, long retryDelayMs);

267

268

/**

269

* Execute transaction with automatic retry on conflicts

270

* @return Transaction results, or null if all retries failed

271

*/

272

public List<Object> execWithRetry();

273

274

/**

275

* Set keys to watch for this reliable transaction

276

* @param keys Keys to watch

277

*/

278

public void setWatchKeys(String... keys);

279

280

/**

281

* Add pre-execution validation

282

* @param validator Validation function that returns true if transaction should proceed

283

*/

284

public void addValidator(Supplier<Boolean> validator);

285

286

/**

287

* Get number of retry attempts made

288

* @return Retry count

289

*/

290

public int getRetryCount();

291

}

292

```

293

294

## Command Pipelining

295

296

### Pipeline

297

298

Batch multiple commands for improved network performance.

299

300

```java { .api }

301

public class Pipeline implements AutoCloseable {

302

/**

303

* Queue SET command

304

* @param key Redis key

305

* @param value String value

306

* @return Response object for deferred result

307

*/

308

public Response<String> set(String key, String value);

309

310

/**

311

* Queue SET command with parameters

312

* @param key Redis key

313

* @param value String value

314

* @param params SET parameters

315

* @return Response object for deferred result

316

*/

317

public Response<String> set(String key, String value, SetParams params);

318

319

/**

320

* Queue GET command

321

* @param key Redis key

322

* @return Response object for deferred result

323

*/

324

public Response<String> get(String key);

325

326

/**

327

* Queue HASH operations

328

* @param key Hash key

329

* @param field Hash field

330

* @param value Field value

331

* @return Response object for deferred result

332

*/

333

public Response<Long> hset(String key, String field, String value);

334

335

public Response<String> hget(String key, String field);

336

337

/**

338

* Queue LIST operations

339

* @param key List key

340

* @param strings Elements to push

341

* @return Response object for deferred result

342

*/

343

public Response<Long> lpush(String key, String... strings);

344

345

public Response<String> lpop(String key);

346

347

/**

348

* Queue SET operations

349

* @param key Set key

350

* @param members Members to add

351

* @return Response object for deferred result

352

*/

353

public Response<Long> sadd(String key, String... members);

354

355

public Response<Set<String>> smembers(String key);

356

357

/**

358

* Queue SORTED SET operations

359

* @param key Sorted set key

360

* @param score Element score

361

* @param member Element member

362

* @return Response object for deferred result

363

*/

364

public Response<Long> zadd(String key, double score, String member);

365

366

public Response<List<String>> zrange(String key, long start, long stop);

367

368

/**

369

* Queue EXPIRY operations

370

* @param key Redis key

371

* @param seconds Expiration in seconds

372

* @return Response object for deferred result

373

*/

374

public Response<Long> expire(String key, long seconds);

375

376

public Response<Long> ttl(String key);

377

378

/**

379

* Send all queued commands to Redis server

380

* Commands are executed but responses are not returned

381

*/

382

public void sync();

383

384

/**

385

* Send all queued commands and return all responses

386

* @return List of command responses in order

387

*/

388

public List<Object> syncAndReturnAll();

389

390

/**

391

* Get number of queued commands

392

* @return Number of commands in pipeline queue

393

*/

394

public int getQueuedCommandCount();

395

396

/**

397

* Clear all queued commands without sending them

398

*/

399

public void clear();

400

401

/**

402

* Close pipeline and send any remaining commands

403

*/

404

public void close();

405

}

406

```

407

408

### Basic Pipeline Usage

409

410

```java

411

Jedis jedis = new Jedis("localhost", 6379);

412

413

try (Pipeline pipeline = jedis.pipelined()) {

414

// Queue multiple commands

415

Response<String> set1 = pipeline.set("user:1", "John");

416

Response<String> set2 = pipeline.set("user:2", "Jane");

417

Response<String> set3 = pipeline.set("user:3", "Bob");

418

419

Response<Long> incr1 = pipeline.incr("counter:views");

420

Response<Long> incr2 = pipeline.incr("counter:users");

421

422

Response<String> get1 = pipeline.get("user:1");

423

Response<String> get2 = pipeline.get("user:2");

424

425

// Send all commands at once

426

pipeline.sync();

427

428

// Access results (available after sync)

429

System.out.println("SET results: " + set1.get() + ", " + set2.get() + ", " + set3.get());

430

System.out.println("INCR results: " + incr1.get() + ", " + incr2.get());

431

System.out.println("GET results: " + get1.get() + ", " + get2.get());

432

}

433

434

jedis.close();

435

```

436

437

### Bulk Data Loading with Pipeline

438

439

```java

440

public void loadBulkData(List<User> users) {

441

Jedis jedis = new Jedis("localhost", 6379);

442

443

try (Pipeline pipeline = jedis.pipelined()) {

444

List<Response<String>> responses = new ArrayList<>();

445

446

for (User user : users) {

447

// User data

448

responses.add(pipeline.set("user:" + user.getId(), user.toJson()));

449

450

// User profile hash

451

pipeline.hset("profile:" + user.getId(), "name", user.getName());

452

pipeline.hset("profile:" + user.getId(), "email", user.getEmail());

453

pipeline.hset("profile:" + user.getId(), "age", String.valueOf(user.getAge()));

454

455

// Add to user sets

456

pipeline.sadd("users:active", user.getId());

457

if (user.isPremium()) {

458

pipeline.sadd("users:premium", user.getId());

459

}

460

461

// User score (for leaderboards)

462

pipeline.zadd("users:score", user.getScore(), user.getId());

463

464

// Set expiration for session data

465

pipeline.expire("user:" + user.getId() + ":session", 3600);

466

}

467

468

// Execute all commands

469

pipeline.sync();

470

471

// Check results if needed

472

for (Response<String> response : responses) {

473

if (!"OK".equals(response.get())) {

474

System.err.println("Failed to set user data");

475

}

476

}

477

478

System.out.println("Loaded " + users.size() + " users in bulk");

479

}

480

481

jedis.close();

482

}

483

```

484

485

### Performance Comparison

486

487

```java

488

public void performanceComparison() {

489

Jedis jedis = new Jedis("localhost", 6379);

490

int operations = 10000;

491

492

// Without pipelining

493

long startTime = System.currentTimeMillis();

494

for (int i = 0; i < operations; i++) {

495

jedis.set("key:" + i, "value:" + i);

496

}

497

long normalTime = System.currentTimeMillis() - startTime;

498

499

// With pipelining

500

startTime = System.currentTimeMillis();

501

try (Pipeline pipeline = jedis.pipelined()) {

502

for (int i = 0; i < operations; i++) {

503

pipeline.set("pkey:" + i, "pvalue:" + i);

504

}

505

pipeline.sync();

506

}

507

long pipelineTime = System.currentTimeMillis() - startTime;

508

509

System.out.println("Normal execution: " + normalTime + "ms");

510

System.out.println("Pipeline execution: " + pipelineTime + "ms");

511

System.out.println("Pipeline is " + (normalTime / (double) pipelineTime) + "x faster");

512

513

jedis.close();

514

}

515

```

516

517

## Response Handling

518

519

### Response

520

521

Wrapper for deferred command results in pipelines and transactions.

522

523

```java { .api }

524

public class Response<T> {

525

/**

526

* Get the result of the command

527

* Only available after pipeline sync() or transaction exec()

528

* @return Command result

529

* @throws IllegalStateException if called before sync/exec

530

*/

531

public T get();

532

533

/**

534

* Check if response is available

535

* @return true if response has been set

536

*/

537

public boolean isSet();

538

539

/**

540

* Get response as string representation

541

* @return String representation of response

542

*/

543

@Override

544

public String toString();

545

}

546

```

547

548

### Safe Response Handling

549

550

```java

551

public void safeResponseHandling() {

552

Jedis jedis = new Jedis("localhost", 6379);

553

554

try (Pipeline pipeline = jedis.pipelined()) {

555

Response<String> response1 = pipeline.get("key1");

556

Response<String> response2 = pipeline.get("key2");

557

Response<Long> response3 = pipeline.incr("counter");

558

559

// Don't access responses before sync!

560

// System.out.println(response1.get()); // This would throw IllegalStateException

561

562

pipeline.sync();

563

564

// Now safe to access responses

565

if (response1.isSet()) {

566

String value1 = response1.get();

567

System.out.println("Key1: " + (value1 != null ? value1 : "null"));

568

}

569

570

if (response2.isSet()) {

571

String value2 = response2.get();

572

System.out.println("Key2: " + (value2 != null ? value2 : "null"));

573

}

574

575

if (response3.isSet()) {

576

Long counter = response3.get();

577

System.out.println("Counter: " + counter);

578

}

579

}

580

581

jedis.close();

582

}

583

```

584

585

## Advanced Patterns

586

587

### Conditional Pipeline Execution

588

589

```java

590

public class ConditionalPipeline {

591

public void conditionalExecution(List<String> userIds) {

592

Jedis jedis = new Jedis("localhost", 6379);

593

594

try (Pipeline pipeline = jedis.pipelined()) {

595

List<Response<String>> existChecks = new ArrayList<>();

596

597

// First, check which users exist

598

for (String userId : userIds) {

599

existChecks.add(pipeline.get("user:" + userId));

600

}

601

602

pipeline.sync(); // Execute existence checks

603

604

// Now conditionally add more commands based on results

605

try (Pipeline secondPipeline = jedis.pipelined()) {

606

for (int i = 0; i < userIds.size(); i++) {

607

String userId = userIds.get(i);

608

String userData = existChecks.get(i).get();

609

610

if (userData != null) {

611

// User exists, update last seen

612

secondPipeline.hset("user:" + userId + ":meta",

613

"last_seen", String.valueOf(System.currentTimeMillis()));

614

secondPipeline.incr("user:" + userId + ":visits");

615

} else {

616

// User doesn't exist, create default data

617

secondPipeline.set("user:" + userId, "{}");

618

secondPipeline.hset("user:" + userId + ":meta",

619

"created", String.valueOf(System.currentTimeMillis()));

620

}

621

}

622

623

secondPipeline.sync();

624

}

625

}

626

627

jedis.close();

628

}

629

}

630

```

631

632

### Pipeline with Error Handling

633

634

```java

635

public class RobustPipelineProcessor {

636

public void processWithErrorHandling(List<String> keys) {

637

Jedis jedis = new Jedis("localhost", 6379);

638

639

try (Pipeline pipeline = jedis.pipelined()) {

640

List<Response<String>> responses = new ArrayList<>();

641

642

// Queue all operations

643

for (String key : keys) {

644

responses.add(pipeline.get(key));

645

}

646

647

try {

648

pipeline.sync();

649

650

// Process results

651

for (int i = 0; i < keys.size(); i++) {

652

String key = keys.get(i);

653

Response<String> response = responses.get(i);

654

655

try {

656

String value = response.get();

657

if (value != null) {

658

processValue(key, value);

659

} else {

660

handleMissingKey(key);

661

}

662

} catch (Exception e) {

663

handleKeyError(key, e);

664

}

665

}

666

667

} catch (Exception e) {

668

System.err.println("Pipeline execution failed: " + e.getMessage());

669

// Could implement retry logic here

670

}

671

}

672

673

jedis.close();

674

}

675

676

private void processValue(String key, String value) {

677

System.out.println("Processing " + key + ": " + value);

678

}

679

680

private void handleMissingKey(String key) {

681

System.out.println("Key not found: " + key);

682

}

683

684

private void handleKeyError(String key, Exception e) {

685

System.err.println("Error processing key " + key + ": " + e.getMessage());

686

}

687

}

688

```

689

690

### Transaction vs Pipeline Decision Matrix

691

692

```java

693

public class TransactionVsPipeline {

694

695

// Use Transaction when:

696

public void atomicOperations() {

697

// 1. ACID properties required

698

// 2. Operations must all succeed or all fail

699

// 3. Using WATCH for optimistic locking

700

// 4. Data consistency is critical

701

702

Jedis jedis = new Jedis("localhost", 6379);

703

jedis.watch("critical_counter");

704

705

try (Transaction tx = jedis.multi()) {

706

tx.incr("critical_counter");

707

tx.set("last_updated", String.valueOf(System.currentTimeMillis()));

708

709

List<Object> results = tx.exec();

710

if (results == null) {

711

// Handle conflict

712

}

713

}

714

jedis.close();

715

}

716

717

// Use Pipeline when:

718

public void performanceOptimization() {

719

// 1. Bulk operations

720

// 2. Network round-trip reduction

721

// 3. High throughput required

722

// 4. Individual command failures acceptable

723

724

Jedis jedis = new Jedis("localhost", 6379);

725

726

try (Pipeline pipeline = jedis.pipelined()) {

727

for (int i = 0; i < 1000; i++) {

728

pipeline.set("bulk:" + i, "data:" + i);

729

}

730

pipeline.sync();

731

}

732

jedis.close();

733

}

734

}

735

```

736

737

Redis transactions and pipelining are essential features for building high-performance applications with Jedis. Transactions provide ACID guarantees and optimistic locking, while pipelining significantly improves throughput for bulk operations.