0
# Transactions and Pipelining
1
2
This document covers Redis transactions and command pipelining in Jedis, including MULTI/EXEC transactions, optimistic locking with WATCH, command batching for performance optimization, and reliable transaction patterns.
3
4
## Redis Transactions
5
6
### Transaction
7
8
Redis MULTI/EXEC transaction implementation providing ACID properties.
9
10
```java { .api }
11
public class Transaction implements AutoCloseable {
12
/**
13
* Queue SET command in transaction
14
* @param key Redis key
15
* @param value String value
16
* @return Response object for deferred result
17
*/
18
public Response<String> set(String key, String value);
19
20
/**
21
* Queue SET command with parameters
22
* @param key Redis key
23
* @param value String value
24
* @param params SET parameters (EX, PX, NX, XX)
25
* @return Response object for deferred result
26
*/
27
public Response<String> set(String key, String value, SetParams params);
28
29
/**
30
* Queue GET command in transaction
31
* @param key Redis key
32
* @return Response object for deferred result
33
*/
34
public Response<String> get(String key);
35
36
/**
37
* Queue DELETE command in transaction
38
* @param keys Redis keys to delete
39
* @return Response object for deferred result
40
*/
41
public Response<Long> del(String... keys);
42
43
/**
44
* Queue INCREMENT command in transaction
45
* @param key Redis key
46
* @return Response object for deferred result
47
*/
48
public Response<Long> incr(String key);
49
50
/**
51
* Queue HASH SET command in transaction
52
* @param key Hash key
53
* @param field Hash field
54
* @param value Field value
55
* @return Response object for deferred result
56
*/
57
public Response<Long> hset(String key, String field, String value);
58
59
/**
60
* Queue HASH GET command in transaction
61
* @param key Hash key
62
* @param field Hash field
63
* @return Response object for deferred result
64
*/
65
public Response<String> hget(String key, String field);
66
67
/**
68
* Queue LIST PUSH command in transaction
69
* @param key List key
70
* @param strings Elements to push
71
* @return Response object for deferred result
72
*/
73
public Response<Long> lpush(String key, String... strings);
74
75
/**
76
* Queue LIST POP command in transaction
77
* @param key List key
78
* @return Response object for deferred result
79
*/
80
public Response<String> lpop(String key);
81
82
/**
83
* Queue SORTED SET ADD command in transaction
84
* @param key Sorted set key
85
* @param score Element score
86
* @param member Element member
87
* @return Response object for deferred result
88
*/
89
public Response<Long> zadd(String key, double score, String member);
90
91
/**
92
* Execute all queued commands atomically
93
* @return List of command results in order, or null if transaction was discarded
94
*/
95
public List<Object> exec();
96
97
/**
98
* Discard all queued commands and exit transaction mode
99
* @return Status code reply
100
*/
101
public String discard();
102
103
/**
104
* Get number of queued commands
105
* @return Number of commands in transaction queue
106
*/
107
public int getQueuedCommandCount();
108
109
/**
110
* Check if transaction has been executed or discarded
111
* @return true if transaction is finished
112
*/
113
public boolean isInMulti();
114
115
/**
116
* Close transaction (calls discard if not executed)
117
*/
118
public void close();
119
}
120
```
121
122
### Basic Transaction Usage
123
124
```java
125
Jedis jedis = new Jedis("localhost", 6379);
126
127
try (Transaction transaction = jedis.multi()) {
128
// Queue commands in transaction
129
Response<String> setResult = transaction.set("user:1:name", "John");
130
Response<Long> incrResult = transaction.incr("user:1:visits");
131
Response<String> getResult = transaction.get("user:1:name");
132
Response<Long> hsetResult = transaction.hset("user:1:profile", "age", "30");
133
134
// Execute all commands atomically
135
List<Object> results = transaction.exec();
136
137
if (results != null) {
138
// Transaction succeeded
139
System.out.println("SET result: " + setResult.get()); // "OK"
140
System.out.println("INCR result: " + incrResult.get()); // 1
141
System.out.println("GET result: " + getResult.get()); // "John"
142
System.out.println("HSET result: " + hsetResult.get()); // 1
143
} else {
144
// Transaction was discarded (due to WATCH)
145
System.out.println("Transaction was discarded");
146
}
147
}
148
149
jedis.close();
150
```
151
152
### Optimistic Locking with WATCH
153
154
Monitor keys for changes and abort transaction if watched keys are modified.
155
156
```java { .api }
157
public interface JedisCommands {
158
/**
159
* Watch keys for changes during transaction
160
* @param keys Keys to watch
161
* @return Status code reply
162
*/
163
String watch(String... keys);
164
165
/**
166
* Unwatch all previously watched keys
167
* @return Status code reply
168
*/
169
String unwatch();
170
171
/**
172
* Begin transaction (MULTI command)
173
* @return Transaction object
174
*/
175
Transaction multi();
176
}
177
```
178
179
#### Optimistic Locking Example
180
181
```java
182
public boolean transferFunds(String fromAccount, String toAccount, double amount) {
183
Jedis jedis = new Jedis("localhost", 6379);
184
185
try {
186
String fromBalanceKey = "account:" + fromAccount + ":balance";
187
String toBalanceKey = "account:" + toAccount + ":balance";
188
189
// Watch keys that will be modified
190
jedis.watch(fromBalanceKey, toBalanceKey);
191
192
// Get current balances
193
String fromBalanceStr = jedis.get(fromBalanceKey);
194
String toBalanceStr = jedis.get(toBalanceKey);
195
196
if (fromBalanceStr == null || toBalanceStr == null) {
197
jedis.unwatch();
198
return false; // Account doesn't exist
199
}
200
201
double fromBalance = Double.parseDouble(fromBalanceStr);
202
double toBalance = Double.parseDouble(toBalanceStr);
203
204
if (fromBalance < amount) {
205
jedis.unwatch();
206
return false; // Insufficient funds
207
}
208
209
// Start transaction
210
try (Transaction transaction = jedis.multi()) {
211
// Update balances
212
transaction.set(fromBalanceKey, String.valueOf(fromBalance - amount));
213
transaction.set(toBalanceKey, String.valueOf(toBalance + amount));
214
215
// Log transaction
216
transaction.lpush("transactions",
217
String.format("Transfer: %s -> %s, Amount: %.2f",
218
fromAccount, toAccount, amount));
219
220
List<Object> results = transaction.exec();
221
222
if (results != null) {
223
System.out.println("Transfer completed successfully");
224
return true;
225
} else {
226
System.out.println("Transfer failed - accounts were modified by another client");
227
return false;
228
}
229
}
230
} finally {
231
jedis.close();
232
}
233
}
234
235
// Usage with retry logic
236
public boolean transferWithRetry(String from, String to, double amount, int maxRetries) {
237
for (int i = 0; i < maxRetries; i++) {
238
if (transferFunds(from, to, amount)) {
239
return true;
240
}
241
242
// Brief delay before retry
243
try {
244
Thread.sleep(10 + (int)(Math.random() * 20)); // 10-30ms random delay
245
} catch (InterruptedException e) {
246
Thread.currentThread().interrupt();
247
break;
248
}
249
}
250
return false;
251
}
252
```
253
254
### ReliableTransaction
255
256
Enhanced transaction with built-in retry logic and error handling.
257
258
```java { .api }
259
public class ReliableTransaction extends Transaction {
260
/**
261
* Creates reliable transaction with retry capability
262
* @param jedis Jedis connection
263
* @param maxRetries Maximum retry attempts
264
* @param retryDelayMs Delay between retries in milliseconds
265
*/
266
public ReliableTransaction(Jedis jedis, int maxRetries, long retryDelayMs);
267
268
/**
269
* Execute transaction with automatic retry on conflicts
270
* @return Transaction results, or null if all retries failed
271
*/
272
public List<Object> execWithRetry();
273
274
/**
275
* Set keys to watch for this reliable transaction
276
* @param keys Keys to watch
277
*/
278
public void setWatchKeys(String... keys);
279
280
/**
281
* Add pre-execution validation
282
* @param validator Validation function that returns true if transaction should proceed
283
*/
284
public void addValidator(Supplier<Boolean> validator);
285
286
/**
287
* Get number of retry attempts made
288
* @return Retry count
289
*/
290
public int getRetryCount();
291
}
292
```
293
294
## Command Pipelining
295
296
### Pipeline
297
298
Batch multiple commands for improved network performance.
299
300
```java { .api }
301
public class Pipeline implements AutoCloseable {
302
/**
303
* Queue SET command
304
* @param key Redis key
305
* @param value String value
306
* @return Response object for deferred result
307
*/
308
public Response<String> set(String key, String value);
309
310
/**
311
* Queue SET command with parameters
312
* @param key Redis key
313
* @param value String value
314
* @param params SET parameters
315
* @return Response object for deferred result
316
*/
317
public Response<String> set(String key, String value, SetParams params);
318
319
/**
320
* Queue GET command
321
* @param key Redis key
322
* @return Response object for deferred result
323
*/
324
public Response<String> get(String key);
325
326
/**
327
* Queue HASH operations
328
* @param key Hash key
329
* @param field Hash field
330
* @param value Field value
331
* @return Response object for deferred result
332
*/
333
public Response<Long> hset(String key, String field, String value);
334
335
public Response<String> hget(String key, String field);
336
337
/**
338
* Queue LIST operations
339
* @param key List key
340
* @param strings Elements to push
341
* @return Response object for deferred result
342
*/
343
public Response<Long> lpush(String key, String... strings);
344
345
public Response<String> lpop(String key);
346
347
/**
348
* Queue SET operations
349
* @param key Set key
350
* @param members Members to add
351
* @return Response object for deferred result
352
*/
353
public Response<Long> sadd(String key, String... members);
354
355
public Response<Set<String>> smembers(String key);
356
357
/**
358
* Queue SORTED SET operations
359
* @param key Sorted set key
360
* @param score Element score
361
* @param member Element member
362
* @return Response object for deferred result
363
*/
364
public Response<Long> zadd(String key, double score, String member);
365
366
public Response<List<String>> zrange(String key, long start, long stop);
367
368
/**
369
* Queue EXPIRY operations
370
* @param key Redis key
371
* @param seconds Expiration in seconds
372
* @return Response object for deferred result
373
*/
374
public Response<Long> expire(String key, long seconds);
375
376
public Response<Long> ttl(String key);
377
378
/**
379
* Send all queued commands to Redis server
380
* Commands are executed but responses are not returned
381
*/
382
public void sync();
383
384
/**
385
* Send all queued commands and return all responses
386
* @return List of command responses in order
387
*/
388
public List<Object> syncAndReturnAll();
389
390
/**
391
* Get number of queued commands
392
* @return Number of commands in pipeline queue
393
*/
394
public int getQueuedCommandCount();
395
396
/**
397
* Clear all queued commands without sending them
398
*/
399
public void clear();
400
401
/**
402
* Close pipeline and send any remaining commands
403
*/
404
public void close();
405
}
406
```
407
408
### Basic Pipeline Usage
409
410
```java
411
Jedis jedis = new Jedis("localhost", 6379);
412
413
try (Pipeline pipeline = jedis.pipelined()) {
414
// Queue multiple commands
415
Response<String> set1 = pipeline.set("user:1", "John");
416
Response<String> set2 = pipeline.set("user:2", "Jane");
417
Response<String> set3 = pipeline.set("user:3", "Bob");
418
419
Response<Long> incr1 = pipeline.incr("counter:views");
420
Response<Long> incr2 = pipeline.incr("counter:users");
421
422
Response<String> get1 = pipeline.get("user:1");
423
Response<String> get2 = pipeline.get("user:2");
424
425
// Send all commands at once
426
pipeline.sync();
427
428
// Access results (available after sync)
429
System.out.println("SET results: " + set1.get() + ", " + set2.get() + ", " + set3.get());
430
System.out.println("INCR results: " + incr1.get() + ", " + incr2.get());
431
System.out.println("GET results: " + get1.get() + ", " + get2.get());
432
}
433
434
jedis.close();
435
```
436
437
### Bulk Data Loading with Pipeline
438
439
```java
440
public void loadBulkData(List<User> users) {
441
Jedis jedis = new Jedis("localhost", 6379);
442
443
try (Pipeline pipeline = jedis.pipelined()) {
444
List<Response<String>> responses = new ArrayList<>();
445
446
for (User user : users) {
447
// User data
448
responses.add(pipeline.set("user:" + user.getId(), user.toJson()));
449
450
// User profile hash
451
pipeline.hset("profile:" + user.getId(), "name", user.getName());
452
pipeline.hset("profile:" + user.getId(), "email", user.getEmail());
453
pipeline.hset("profile:" + user.getId(), "age", String.valueOf(user.getAge()));
454
455
// Add to user sets
456
pipeline.sadd("users:active", user.getId());
457
if (user.isPremium()) {
458
pipeline.sadd("users:premium", user.getId());
459
}
460
461
// User score (for leaderboards)
462
pipeline.zadd("users:score", user.getScore(), user.getId());
463
464
// Set expiration for session data
465
pipeline.expire("user:" + user.getId() + ":session", 3600);
466
}
467
468
// Execute all commands
469
pipeline.sync();
470
471
// Check results if needed
472
for (Response<String> response : responses) {
473
if (!"OK".equals(response.get())) {
474
System.err.println("Failed to set user data");
475
}
476
}
477
478
System.out.println("Loaded " + users.size() + " users in bulk");
479
}
480
481
jedis.close();
482
}
483
```
484
485
### Performance Comparison
486
487
```java
488
public void performanceComparison() {
489
Jedis jedis = new Jedis("localhost", 6379);
490
int operations = 10000;
491
492
// Without pipelining
493
long startTime = System.currentTimeMillis();
494
for (int i = 0; i < operations; i++) {
495
jedis.set("key:" + i, "value:" + i);
496
}
497
long normalTime = System.currentTimeMillis() - startTime;
498
499
// With pipelining
500
startTime = System.currentTimeMillis();
501
try (Pipeline pipeline = jedis.pipelined()) {
502
for (int i = 0; i < operations; i++) {
503
pipeline.set("pkey:" + i, "pvalue:" + i);
504
}
505
pipeline.sync();
506
}
507
long pipelineTime = System.currentTimeMillis() - startTime;
508
509
System.out.println("Normal execution: " + normalTime + "ms");
510
System.out.println("Pipeline execution: " + pipelineTime + "ms");
511
System.out.println("Pipeline is " + (normalTime / (double) pipelineTime) + "x faster");
512
513
jedis.close();
514
}
515
```
516
517
## Response Handling
518
519
### Response
520
521
Wrapper for deferred command results in pipelines and transactions.
522
523
```java { .api }
524
public class Response<T> {
525
/**
526
* Get the result of the command
527
* Only available after pipeline sync() or transaction exec()
528
* @return Command result
529
* @throws IllegalStateException if called before sync/exec
530
*/
531
public T get();
532
533
/**
534
* Check if response is available
535
* @return true if response has been set
536
*/
537
public boolean isSet();
538
539
/**
540
* Get response as string representation
541
* @return String representation of response
542
*/
543
@Override
544
public String toString();
545
}
546
```
547
548
### Safe Response Handling
549
550
```java
551
public void safeResponseHandling() {
552
Jedis jedis = new Jedis("localhost", 6379);
553
554
try (Pipeline pipeline = jedis.pipelined()) {
555
Response<String> response1 = pipeline.get("key1");
556
Response<String> response2 = pipeline.get("key2");
557
Response<Long> response3 = pipeline.incr("counter");
558
559
// Don't access responses before sync!
560
// System.out.println(response1.get()); // This would throw IllegalStateException
561
562
pipeline.sync();
563
564
// Now safe to access responses
565
if (response1.isSet()) {
566
String value1 = response1.get();
567
System.out.println("Key1: " + (value1 != null ? value1 : "null"));
568
}
569
570
if (response2.isSet()) {
571
String value2 = response2.get();
572
System.out.println("Key2: " + (value2 != null ? value2 : "null"));
573
}
574
575
if (response3.isSet()) {
576
Long counter = response3.get();
577
System.out.println("Counter: " + counter);
578
}
579
}
580
581
jedis.close();
582
}
583
```
584
585
## Advanced Patterns
586
587
### Conditional Pipeline Execution
588
589
```java
590
public class ConditionalPipeline {
591
public void conditionalExecution(List<String> userIds) {
592
Jedis jedis = new Jedis("localhost", 6379);
593
594
try (Pipeline pipeline = jedis.pipelined()) {
595
List<Response<String>> existChecks = new ArrayList<>();
596
597
// First, check which users exist
598
for (String userId : userIds) {
599
existChecks.add(pipeline.get("user:" + userId));
600
}
601
602
pipeline.sync(); // Execute existence checks
603
604
// Now conditionally add more commands based on results
605
try (Pipeline secondPipeline = jedis.pipelined()) {
606
for (int i = 0; i < userIds.size(); i++) {
607
String userId = userIds.get(i);
608
String userData = existChecks.get(i).get();
609
610
if (userData != null) {
611
// User exists, update last seen
612
secondPipeline.hset("user:" + userId + ":meta",
613
"last_seen", String.valueOf(System.currentTimeMillis()));
614
secondPipeline.incr("user:" + userId + ":visits");
615
} else {
616
// User doesn't exist, create default data
617
secondPipeline.set("user:" + userId, "{}");
618
secondPipeline.hset("user:" + userId + ":meta",
619
"created", String.valueOf(System.currentTimeMillis()));
620
}
621
}
622
623
secondPipeline.sync();
624
}
625
}
626
627
jedis.close();
628
}
629
}
630
```
631
632
### Pipeline with Error Handling
633
634
```java
635
public class RobustPipelineProcessor {
636
public void processWithErrorHandling(List<String> keys) {
637
Jedis jedis = new Jedis("localhost", 6379);
638
639
try (Pipeline pipeline = jedis.pipelined()) {
640
List<Response<String>> responses = new ArrayList<>();
641
642
// Queue all operations
643
for (String key : keys) {
644
responses.add(pipeline.get(key));
645
}
646
647
try {
648
pipeline.sync();
649
650
// Process results
651
for (int i = 0; i < keys.size(); i++) {
652
String key = keys.get(i);
653
Response<String> response = responses.get(i);
654
655
try {
656
String value = response.get();
657
if (value != null) {
658
processValue(key, value);
659
} else {
660
handleMissingKey(key);
661
}
662
} catch (Exception e) {
663
handleKeyError(key, e);
664
}
665
}
666
667
} catch (Exception e) {
668
System.err.println("Pipeline execution failed: " + e.getMessage());
669
// Could implement retry logic here
670
}
671
}
672
673
jedis.close();
674
}
675
676
private void processValue(String key, String value) {
677
System.out.println("Processing " + key + ": " + value);
678
}
679
680
private void handleMissingKey(String key) {
681
System.out.println("Key not found: " + key);
682
}
683
684
private void handleKeyError(String key, Exception e) {
685
System.err.println("Error processing key " + key + ": " + e.getMessage());
686
}
687
}
688
```
689
690
### Transaction vs Pipeline Decision Matrix
691
692
```java
693
public class TransactionVsPipeline {
694
695
// Use Transaction when:
696
public void atomicOperations() {
697
// 1. ACID properties required
698
// 2. Operations must all succeed or all fail
699
// 3. Using WATCH for optimistic locking
700
// 4. Data consistency is critical
701
702
Jedis jedis = new Jedis("localhost", 6379);
703
jedis.watch("critical_counter");
704
705
try (Transaction tx = jedis.multi()) {
706
tx.incr("critical_counter");
707
tx.set("last_updated", String.valueOf(System.currentTimeMillis()));
708
709
List<Object> results = tx.exec();
710
if (results == null) {
711
// Handle conflict
712
}
713
}
714
jedis.close();
715
}
716
717
// Use Pipeline when:
718
public void performanceOptimization() {
719
// 1. Bulk operations
720
// 2. Network round-trip reduction
721
// 3. High throughput required
722
// 4. Individual command failures acceptable
723
724
Jedis jedis = new Jedis("localhost", 6379);
725
726
try (Pipeline pipeline = jedis.pipelined()) {
727
for (int i = 0; i < 1000; i++) {
728
pipeline.set("bulk:" + i, "data:" + i);
729
}
730
pipeline.sync();
731
}
732
jedis.close();
733
}
734
}
735
```
736
737
Redis transactions and pipelining are essential features for building high-performance applications with Jedis. Transactions provide ACID guarantees and optimistic locking, while pipelining significantly improves throughput for bulk operations.