Jedis is a blazingly small and sane Redis java client.
This document covers Redis transactions and command pipelining in Jedis, including MULTI/EXEC transactions, optimistic locking with WATCH, command batching for performance optimization, and reliable transaction patterns.
Redis MULTI/EXEC transaction implementation providing ACID properties.
public class Transaction implements AutoCloseable {
/**
* Queue SET command in transaction
* @param key Redis key
* @param value String value
* @return Response object for deferred result
*/
public Response<String> set(String key, String value);
/**
* Queue SET command with parameters
* @param key Redis key
* @param value String value
* @param params SET parameters (EX, PX, NX, XX)
* @return Response object for deferred result
*/
public Response<String> set(String key, String value, SetParams params);
/**
* Queue GET command in transaction
* @param key Redis key
* @return Response object for deferred result
*/
public Response<String> get(String key);
/**
* Queue DELETE command in transaction
* @param keys Redis keys to delete
* @return Response object for deferred result
*/
public Response<Long> del(String... keys);
/**
* Queue INCREMENT command in transaction
* @param key Redis key
* @return Response object for deferred result
*/
public Response<Long> incr(String key);
/**
* Queue HASH SET command in transaction
* @param key Hash key
* @param field Hash field
* @param value Field value
* @return Response object for deferred result
*/
public Response<Long> hset(String key, String field, String value);
/**
* Queue HASH GET command in transaction
* @param key Hash key
* @param field Hash field
* @return Response object for deferred result
*/
public Response<String> hget(String key, String field);
/**
* Queue LIST PUSH command in transaction
* @param key List key
* @param strings Elements to push
* @return Response object for deferred result
*/
public Response<Long> lpush(String key, String... strings);
/**
* Queue LIST POP command in transaction
* @param key List key
* @return Response object for deferred result
*/
public Response<String> lpop(String key);
/**
* Queue SORTED SET ADD command in transaction
* @param key Sorted set key
* @param score Element score
* @param member Element member
* @return Response object for deferred result
*/
public Response<Long> zadd(String key, double score, String member);
/**
* Execute all queued commands atomically
* @return List of command results in order, or null if transaction was discarded
*/
public List<Object> exec();
/**
* Discard all queued commands and exit transaction mode
* @return Status code reply
*/
public String discard();
/**
* Get number of queued commands
* @return Number of commands in transaction queue
*/
public int getQueuedCommandCount();
/**
* Check if transaction has been executed or discarded
* @return true if transaction is finished
*/
public boolean isInMulti();
/**
* Close transaction (calls discard if not executed)
*/
public void close();
}Jedis jedis = new Jedis("localhost", 6379);
try (Transaction transaction = jedis.multi()) {
// Queue commands in transaction
Response<String> setResult = transaction.set("user:1:name", "John");
Response<Long> incrResult = transaction.incr("user:1:visits");
Response<String> getResult = transaction.get("user:1:name");
Response<Long> hsetResult = transaction.hset("user:1:profile", "age", "30");
// Execute all commands atomically
List<Object> results = transaction.exec();
if (results != null) {
// Transaction succeeded
System.out.println("SET result: " + setResult.get()); // "OK"
System.out.println("INCR result: " + incrResult.get()); // 1
System.out.println("GET result: " + getResult.get()); // "John"
System.out.println("HSET result: " + hsetResult.get()); // 1
} else {
// Transaction was discarded (due to WATCH)
System.out.println("Transaction was discarded");
}
}
jedis.close();Monitor keys for changes and abort transaction if watched keys are modified.
public interface JedisCommands {
/**
* Watch keys for changes during transaction
* @param keys Keys to watch
* @return Status code reply
*/
String watch(String... keys);
/**
* Unwatch all previously watched keys
* @return Status code reply
*/
String unwatch();
/**
* Begin transaction (MULTI command)
* @return Transaction object
*/
Transaction multi();
}public boolean transferFunds(String fromAccount, String toAccount, double amount) {
Jedis jedis = new Jedis("localhost", 6379);
try {
String fromBalanceKey = "account:" + fromAccount + ":balance";
String toBalanceKey = "account:" + toAccount + ":balance";
// Watch keys that will be modified
jedis.watch(fromBalanceKey, toBalanceKey);
// Get current balances
String fromBalanceStr = jedis.get(fromBalanceKey);
String toBalanceStr = jedis.get(toBalanceKey);
if (fromBalanceStr == null || toBalanceStr == null) {
jedis.unwatch();
return false; // Account doesn't exist
}
double fromBalance = Double.parseDouble(fromBalanceStr);
double toBalance = Double.parseDouble(toBalanceStr);
if (fromBalance < amount) {
jedis.unwatch();
return false; // Insufficient funds
}
// Start transaction
try (Transaction transaction = jedis.multi()) {
// Update balances
transaction.set(fromBalanceKey, String.valueOf(fromBalance - amount));
transaction.set(toBalanceKey, String.valueOf(toBalance + amount));
// Log transaction
transaction.lpush("transactions",
String.format("Transfer: %s -> %s, Amount: %.2f",
fromAccount, toAccount, amount));
List<Object> results = transaction.exec();
if (results != null) {
System.out.println("Transfer completed successfully");
return true;
} else {
System.out.println("Transfer failed - accounts were modified by another client");
return false;
}
}
} finally {
jedis.close();
}
}
// Usage with retry logic
public boolean transferWithRetry(String from, String to, double amount, int maxRetries) {
for (int i = 0; i < maxRetries; i++) {
if (transferFunds(from, to, amount)) {
return true;
}
// Brief delay before retry
try {
Thread.sleep(10 + (int)(Math.random() * 20)); // 10-30ms random delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return false;
}Enhanced transaction with built-in retry logic and error handling.
public class ReliableTransaction extends Transaction {
/**
* Creates reliable transaction with retry capability
* @param jedis Jedis connection
* @param maxRetries Maximum retry attempts
* @param retryDelayMs Delay between retries in milliseconds
*/
public ReliableTransaction(Jedis jedis, int maxRetries, long retryDelayMs);
/**
* Execute transaction with automatic retry on conflicts
* @return Transaction results, or null if all retries failed
*/
public List<Object> execWithRetry();
/**
* Set keys to watch for this reliable transaction
* @param keys Keys to watch
*/
public void setWatchKeys(String... keys);
/**
* Add pre-execution validation
* @param validator Validation function that returns true if transaction should proceed
*/
public void addValidator(Supplier<Boolean> validator);
/**
* Get number of retry attempts made
* @return Retry count
*/
public int getRetryCount();
}Batch multiple commands for improved network performance.
public class Pipeline implements AutoCloseable {
/**
* Queue SET command
* @param key Redis key
* @param value String value
* @return Response object for deferred result
*/
public Response<String> set(String key, String value);
/**
* Queue SET command with parameters
* @param key Redis key
* @param value String value
* @param params SET parameters
* @return Response object for deferred result
*/
public Response<String> set(String key, String value, SetParams params);
/**
* Queue GET command
* @param key Redis key
* @return Response object for deferred result
*/
public Response<String> get(String key);
/**
* Queue HASH operations
* @param key Hash key
* @param field Hash field
* @param value Field value
* @return Response object for deferred result
*/
public Response<Long> hset(String key, String field, String value);
public Response<String> hget(String key, String field);
/**
* Queue LIST operations
* @param key List key
* @param strings Elements to push
* @return Response object for deferred result
*/
public Response<Long> lpush(String key, String... strings);
public Response<String> lpop(String key);
/**
* Queue SET operations
* @param key Set key
* @param members Members to add
* @return Response object for deferred result
*/
public Response<Long> sadd(String key, String... members);
public Response<Set<String>> smembers(String key);
/**
* Queue SORTED SET operations
* @param key Sorted set key
* @param score Element score
* @param member Element member
* @return Response object for deferred result
*/
public Response<Long> zadd(String key, double score, String member);
public Response<List<String>> zrange(String key, long start, long stop);
/**
* Queue EXPIRY operations
* @param key Redis key
* @param seconds Expiration in seconds
* @return Response object for deferred result
*/
public Response<Long> expire(String key, long seconds);
public Response<Long> ttl(String key);
/**
* Send all queued commands to Redis server
* Commands are executed but responses are not returned
*/
public void sync();
/**
* Send all queued commands and return all responses
* @return List of command responses in order
*/
public List<Object> syncAndReturnAll();
/**
* Get number of queued commands
* @return Number of commands in pipeline queue
*/
public int getQueuedCommandCount();
/**
* Clear all queued commands without sending them
*/
public void clear();
/**
* Close pipeline and send any remaining commands
*/
public void close();
}Jedis jedis = new Jedis("localhost", 6379);
try (Pipeline pipeline = jedis.pipelined()) {
// Queue multiple commands
Response<String> set1 = pipeline.set("user:1", "John");
Response<String> set2 = pipeline.set("user:2", "Jane");
Response<String> set3 = pipeline.set("user:3", "Bob");
Response<Long> incr1 = pipeline.incr("counter:views");
Response<Long> incr2 = pipeline.incr("counter:users");
Response<String> get1 = pipeline.get("user:1");
Response<String> get2 = pipeline.get("user:2");
// Send all commands at once
pipeline.sync();
// Access results (available after sync)
System.out.println("SET results: " + set1.get() + ", " + set2.get() + ", " + set3.get());
System.out.println("INCR results: " + incr1.get() + ", " + incr2.get());
System.out.println("GET results: " + get1.get() + ", " + get2.get());
}
jedis.close();public void loadBulkData(List<User> users) {
Jedis jedis = new Jedis("localhost", 6379);
try (Pipeline pipeline = jedis.pipelined()) {
List<Response<String>> responses = new ArrayList<>();
for (User user : users) {
// User data
responses.add(pipeline.set("user:" + user.getId(), user.toJson()));
// User profile hash
pipeline.hset("profile:" + user.getId(), "name", user.getName());
pipeline.hset("profile:" + user.getId(), "email", user.getEmail());
pipeline.hset("profile:" + user.getId(), "age", String.valueOf(user.getAge()));
// Add to user sets
pipeline.sadd("users:active", user.getId());
if (user.isPremium()) {
pipeline.sadd("users:premium", user.getId());
}
// User score (for leaderboards)
pipeline.zadd("users:score", user.getScore(), user.getId());
// Set expiration for session data
pipeline.expire("user:" + user.getId() + ":session", 3600);
}
// Execute all commands
pipeline.sync();
// Check results if needed
for (Response<String> response : responses) {
if (!"OK".equals(response.get())) {
System.err.println("Failed to set user data");
}
}
System.out.println("Loaded " + users.size() + " users in bulk");
}
jedis.close();
}public void performanceComparison() {
Jedis jedis = new Jedis("localhost", 6379);
int operations = 10000;
// Without pipelining
long startTime = System.currentTimeMillis();
for (int i = 0; i < operations; i++) {
jedis.set("key:" + i, "value:" + i);
}
long normalTime = System.currentTimeMillis() - startTime;
// With pipelining
startTime = System.currentTimeMillis();
try (Pipeline pipeline = jedis.pipelined()) {
for (int i = 0; i < operations; i++) {
pipeline.set("pkey:" + i, "pvalue:" + i);
}
pipeline.sync();
}
long pipelineTime = System.currentTimeMillis() - startTime;
System.out.println("Normal execution: " + normalTime + "ms");
System.out.println("Pipeline execution: " + pipelineTime + "ms");
System.out.println("Pipeline is " + (normalTime / (double) pipelineTime) + "x faster");
jedis.close();
}Wrapper for deferred command results in pipelines and transactions.
public class Response<T> {
/**
* Get the result of the command
* Only available after pipeline sync() or transaction exec()
* @return Command result
* @throws IllegalStateException if called before sync/exec
*/
public T get();
/**
* Check if response is available
* @return true if response has been set
*/
public boolean isSet();
/**
* Get response as string representation
* @return String representation of response
*/
@Override
public String toString();
}public void safeResponseHandling() {
Jedis jedis = new Jedis("localhost", 6379);
try (Pipeline pipeline = jedis.pipelined()) {
Response<String> response1 = pipeline.get("key1");
Response<String> response2 = pipeline.get("key2");
Response<Long> response3 = pipeline.incr("counter");
// Don't access responses before sync!
// System.out.println(response1.get()); // This would throw IllegalStateException
pipeline.sync();
// Now safe to access responses
if (response1.isSet()) {
String value1 = response1.get();
System.out.println("Key1: " + (value1 != null ? value1 : "null"));
}
if (response2.isSet()) {
String value2 = response2.get();
System.out.println("Key2: " + (value2 != null ? value2 : "null"));
}
if (response3.isSet()) {
Long counter = response3.get();
System.out.println("Counter: " + counter);
}
}
jedis.close();
}public class ConditionalPipeline {
public void conditionalExecution(List<String> userIds) {
Jedis jedis = new Jedis("localhost", 6379);
try (Pipeline pipeline = jedis.pipelined()) {
List<Response<String>> existChecks = new ArrayList<>();
// First, check which users exist
for (String userId : userIds) {
existChecks.add(pipeline.get("user:" + userId));
}
pipeline.sync(); // Execute existence checks
// Now conditionally add more commands based on results
try (Pipeline secondPipeline = jedis.pipelined()) {
for (int i = 0; i < userIds.size(); i++) {
String userId = userIds.get(i);
String userData = existChecks.get(i).get();
if (userData != null) {
// User exists, update last seen
secondPipeline.hset("user:" + userId + ":meta",
"last_seen", String.valueOf(System.currentTimeMillis()));
secondPipeline.incr("user:" + userId + ":visits");
} else {
// User doesn't exist, create default data
secondPipeline.set("user:" + userId, "{}");
secondPipeline.hset("user:" + userId + ":meta",
"created", String.valueOf(System.currentTimeMillis()));
}
}
secondPipeline.sync();
}
}
jedis.close();
}
}public class RobustPipelineProcessor {
public void processWithErrorHandling(List<String> keys) {
Jedis jedis = new Jedis("localhost", 6379);
try (Pipeline pipeline = jedis.pipelined()) {
List<Response<String>> responses = new ArrayList<>();
// Queue all operations
for (String key : keys) {
responses.add(pipeline.get(key));
}
try {
pipeline.sync();
// Process results
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
Response<String> response = responses.get(i);
try {
String value = response.get();
if (value != null) {
processValue(key, value);
} else {
handleMissingKey(key);
}
} catch (Exception e) {
handleKeyError(key, e);
}
}
} catch (Exception e) {
System.err.println("Pipeline execution failed: " + e.getMessage());
// Could implement retry logic here
}
}
jedis.close();
}
private void processValue(String key, String value) {
System.out.println("Processing " + key + ": " + value);
}
private void handleMissingKey(String key) {
System.out.println("Key not found: " + key);
}
private void handleKeyError(String key, Exception e) {
System.err.println("Error processing key " + key + ": " + e.getMessage());
}
}public class TransactionVsPipeline {
// Use Transaction when:
public void atomicOperations() {
// 1. ACID properties required
// 2. Operations must all succeed or all fail
// 3. Using WATCH for optimistic locking
// 4. Data consistency is critical
Jedis jedis = new Jedis("localhost", 6379);
jedis.watch("critical_counter");
try (Transaction tx = jedis.multi()) {
tx.incr("critical_counter");
tx.set("last_updated", String.valueOf(System.currentTimeMillis()));
List<Object> results = tx.exec();
if (results == null) {
// Handle conflict
}
}
jedis.close();
}
// Use Pipeline when:
public void performanceOptimization() {
// 1. Bulk operations
// 2. Network round-trip reduction
// 3. High throughput required
// 4. Individual command failures acceptable
Jedis jedis = new Jedis("localhost", 6379);
try (Pipeline pipeline = jedis.pipelined()) {
for (int i = 0; i < 1000; i++) {
pipeline.set("bulk:" + i, "data:" + i);
}
pipeline.sync();
}
jedis.close();
}
}Redis transactions and pipelining are essential features for building high-performance applications with Jedis. Transactions provide ACID guarantees and optimistic locking, while pipelining significantly improves throughput for bulk operations.
Install with Tessl CLI
npx tessl i tessl/maven-redis-clients--jedis