CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-redis-clients--jedis

Jedis is a blazingly small and sane Redis java client.

Overview
Eval results
Files

transactions-pipelining.mddocs/

Transactions and Pipelining

This document covers Redis transactions and command pipelining in Jedis, including MULTI/EXEC transactions, optimistic locking with WATCH, command batching for performance optimization, and reliable transaction patterns.

Redis Transactions

Transaction

Redis MULTI/EXEC transaction implementation providing ACID properties.

public class Transaction implements AutoCloseable {
    /**
     * Queue SET command in transaction
     * @param key Redis key
     * @param value String value
     * @return Response object for deferred result
     */
    public Response<String> set(String key, String value);
    
    /**
     * Queue SET command with parameters
     * @param key Redis key
     * @param value String value
     * @param params SET parameters (EX, PX, NX, XX)
     * @return Response object for deferred result
     */
    public Response<String> set(String key, String value, SetParams params);
    
    /**
     * Queue GET command in transaction
     * @param key Redis key
     * @return Response object for deferred result
     */
    public Response<String> get(String key);
    
    /**
     * Queue DELETE command in transaction
     * @param keys Redis keys to delete
     * @return Response object for deferred result
     */
    public Response<Long> del(String... keys);
    
    /**
     * Queue INCREMENT command in transaction
     * @param key Redis key
     * @return Response object for deferred result
     */
    public Response<Long> incr(String key);
    
    /**
     * Queue HASH SET command in transaction
     * @param key Hash key
     * @param field Hash field
     * @param value Field value
     * @return Response object for deferred result
     */
    public Response<Long> hset(String key, String field, String value);
    
    /**
     * Queue HASH GET command in transaction
     * @param key Hash key
     * @param field Hash field
     * @return Response object for deferred result
     */
    public Response<String> hget(String key, String field);
    
    /**
     * Queue LIST PUSH command in transaction
     * @param key List key
     * @param strings Elements to push
     * @return Response object for deferred result
     */
    public Response<Long> lpush(String key, String... strings);
    
    /**
     * Queue LIST POP command in transaction
     * @param key List key
     * @return Response object for deferred result
     */
    public Response<String> lpop(String key);
    
    /**
     * Queue SORTED SET ADD command in transaction
     * @param key Sorted set key
     * @param score Element score
     * @param member Element member
     * @return Response object for deferred result
     */
    public Response<Long> zadd(String key, double score, String member);
    
    /**
     * Execute all queued commands atomically
     * @return List of command results in order, or null if transaction was discarded
     */
    public List<Object> exec();
    
    /**
     * Discard all queued commands and exit transaction mode
     * @return Status code reply
     */
    public String discard();
    
    /**
     * Get number of queued commands
     * @return Number of commands in transaction queue
     */
    public int getQueuedCommandCount();
    
    /**
     * Check if transaction has been executed or discarded
     * @return true if transaction is finished
     */
    public boolean isInMulti();
    
    /**
     * Close transaction (calls discard if not executed)
     */
    public void close();
}

Basic Transaction Usage

Jedis jedis = new Jedis("localhost", 6379);

try (Transaction transaction = jedis.multi()) {
    // Queue commands in transaction
    Response<String> setResult = transaction.set("user:1:name", "John");
    Response<Long> incrResult = transaction.incr("user:1:visits"); 
    Response<String> getResult = transaction.get("user:1:name");
    Response<Long> hsetResult = transaction.hset("user:1:profile", "age", "30");
    
    // Execute all commands atomically
    List<Object> results = transaction.exec();
    
    if (results != null) {
        // Transaction succeeded
        System.out.println("SET result: " + setResult.get());      // "OK"
        System.out.println("INCR result: " + incrResult.get());    // 1
        System.out.println("GET result: " + getResult.get());      // "John"
        System.out.println("HSET result: " + hsetResult.get());    // 1
    } else {
        // Transaction was discarded (due to WATCH)
        System.out.println("Transaction was discarded");
    }
}

jedis.close();

Optimistic Locking with WATCH

Monitor keys for changes and abort transaction if watched keys are modified.

public interface JedisCommands {
    /**
     * Watch keys for changes during transaction
     * @param keys Keys to watch
     * @return Status code reply
     */
    String watch(String... keys);
    
    /**
     * Unwatch all previously watched keys
     * @return Status code reply  
     */
    String unwatch();
    
    /**
     * Begin transaction (MULTI command)
     * @return Transaction object
     */
    Transaction multi();
}

Optimistic Locking Example

public boolean transferFunds(String fromAccount, String toAccount, double amount) {
    Jedis jedis = new Jedis("localhost", 6379);
    
    try {
        String fromBalanceKey = "account:" + fromAccount + ":balance";
        String toBalanceKey = "account:" + toAccount + ":balance";
        
        // Watch keys that will be modified
        jedis.watch(fromBalanceKey, toBalanceKey);
        
        // Get current balances
        String fromBalanceStr = jedis.get(fromBalanceKey);
        String toBalanceStr = jedis.get(toBalanceKey);
        
        if (fromBalanceStr == null || toBalanceStr == null) {
            jedis.unwatch();
            return false; // Account doesn't exist
        }
        
        double fromBalance = Double.parseDouble(fromBalanceStr);
        double toBalance = Double.parseDouble(toBalanceStr);
        
        if (fromBalance < amount) {
            jedis.unwatch();
            return false; // Insufficient funds
        }
        
        // Start transaction
        try (Transaction transaction = jedis.multi()) {
            // Update balances
            transaction.set(fromBalanceKey, String.valueOf(fromBalance - amount));
            transaction.set(toBalanceKey, String.valueOf(toBalance + amount));
            
            // Log transaction
            transaction.lpush("transactions", 
                String.format("Transfer: %s -> %s, Amount: %.2f", 
                            fromAccount, toAccount, amount));
            
            List<Object> results = transaction.exec();
            
            if (results != null) {
                System.out.println("Transfer completed successfully");
                return true;
            } else {
                System.out.println("Transfer failed - accounts were modified by another client");
                return false;
            }
        }
    } finally {
        jedis.close();
    }
}

// Usage with retry logic
public boolean transferWithRetry(String from, String to, double amount, int maxRetries) {
    for (int i = 0; i < maxRetries; i++) {
        if (transferFunds(from, to, amount)) {
            return true;
        }
        
        // Brief delay before retry
        try {
            Thread.sleep(10 + (int)(Math.random() * 20)); // 10-30ms random delay
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    return false;
}

ReliableTransaction

Enhanced transaction with built-in retry logic and error handling.

public class ReliableTransaction extends Transaction {
    /**
     * Creates reliable transaction with retry capability
     * @param jedis Jedis connection
     * @param maxRetries Maximum retry attempts
     * @param retryDelayMs Delay between retries in milliseconds
     */
    public ReliableTransaction(Jedis jedis, int maxRetries, long retryDelayMs);
    
    /**
     * Execute transaction with automatic retry on conflicts
     * @return Transaction results, or null if all retries failed
     */
    public List<Object> execWithRetry();
    
    /**
     * Set keys to watch for this reliable transaction
     * @param keys Keys to watch
     */
    public void setWatchKeys(String... keys);
    
    /**
     * Add pre-execution validation
     * @param validator Validation function that returns true if transaction should proceed
     */
    public void addValidator(Supplier<Boolean> validator);
    
    /**
     * Get number of retry attempts made
     * @return Retry count
     */
    public int getRetryCount();
}

Command Pipelining

Pipeline

Batch multiple commands for improved network performance.

public class Pipeline implements AutoCloseable {
    /**
     * Queue SET command
     * @param key Redis key
     * @param value String value
     * @return Response object for deferred result
     */
    public Response<String> set(String key, String value);
    
    /**
     * Queue SET command with parameters
     * @param key Redis key
     * @param value String value
     * @param params SET parameters
     * @return Response object for deferred result
     */
    public Response<String> set(String key, String value, SetParams params);
    
    /**
     * Queue GET command
     * @param key Redis key
     * @return Response object for deferred result
     */
    public Response<String> get(String key);
    
    /**
     * Queue HASH operations
     * @param key Hash key
     * @param field Hash field
     * @param value Field value
     * @return Response object for deferred result
     */
    public Response<Long> hset(String key, String field, String value);
    
    public Response<String> hget(String key, String field);
    
    /**
     * Queue LIST operations
     * @param key List key
     * @param strings Elements to push
     * @return Response object for deferred result
     */
    public Response<Long> lpush(String key, String... strings);
    
    public Response<String> lpop(String key);
    
    /**
     * Queue SET operations
     * @param key Set key
     * @param members Members to add
     * @return Response object for deferred result
     */
    public Response<Long> sadd(String key, String... members);
    
    public Response<Set<String>> smembers(String key);
    
    /**
     * Queue SORTED SET operations
     * @param key Sorted set key
     * @param score Element score
     * @param member Element member
     * @return Response object for deferred result
     */
    public Response<Long> zadd(String key, double score, String member);
    
    public Response<List<String>> zrange(String key, long start, long stop);
    
    /**
     * Queue EXPIRY operations
     * @param key Redis key
     * @param seconds Expiration in seconds
     * @return Response object for deferred result
     */
    public Response<Long> expire(String key, long seconds);
    
    public Response<Long> ttl(String key);
    
    /**
     * Send all queued commands to Redis server
     * Commands are executed but responses are not returned
     */
    public void sync();
    
    /**
     * Send all queued commands and return all responses
     * @return List of command responses in order
     */
    public List<Object> syncAndReturnAll();
    
    /**
     * Get number of queued commands
     * @return Number of commands in pipeline queue
     */
    public int getQueuedCommandCount();
    
    /**
     * Clear all queued commands without sending them
     */
    public void clear();
    
    /**
     * Close pipeline and send any remaining commands
     */
    public void close();
}

Basic Pipeline Usage

Jedis jedis = new Jedis("localhost", 6379);

try (Pipeline pipeline = jedis.pipelined()) {
    // Queue multiple commands
    Response<String> set1 = pipeline.set("user:1", "John");
    Response<String> set2 = pipeline.set("user:2", "Jane");
    Response<String> set3 = pipeline.set("user:3", "Bob");
    
    Response<Long> incr1 = pipeline.incr("counter:views");
    Response<Long> incr2 = pipeline.incr("counter:users");
    
    Response<String> get1 = pipeline.get("user:1");
    Response<String> get2 = pipeline.get("user:2");
    
    // Send all commands at once
    pipeline.sync();
    
    // Access results (available after sync)
    System.out.println("SET results: " + set1.get() + ", " + set2.get() + ", " + set3.get());
    System.out.println("INCR results: " + incr1.get() + ", " + incr2.get());
    System.out.println("GET results: " + get1.get() + ", " + get2.get());
}

jedis.close();

Bulk Data Loading with Pipeline

public void loadBulkData(List<User> users) {
    Jedis jedis = new Jedis("localhost", 6379);
    
    try (Pipeline pipeline = jedis.pipelined()) {
        List<Response<String>> responses = new ArrayList<>();
        
        for (User user : users) {
            // User data
            responses.add(pipeline.set("user:" + user.getId(), user.toJson()));
            
            // User profile hash
            pipeline.hset("profile:" + user.getId(), "name", user.getName());
            pipeline.hset("profile:" + user.getId(), "email", user.getEmail());
            pipeline.hset("profile:" + user.getId(), "age", String.valueOf(user.getAge()));
            
            // Add to user sets
            pipeline.sadd("users:active", user.getId());
            if (user.isPremium()) {
                pipeline.sadd("users:premium", user.getId());
            }
            
            // User score (for leaderboards)
            pipeline.zadd("users:score", user.getScore(), user.getId());
            
            // Set expiration for session data
            pipeline.expire("user:" + user.getId() + ":session", 3600);
        }
        
        // Execute all commands
        pipeline.sync();
        
        // Check results if needed
        for (Response<String> response : responses) {
            if (!"OK".equals(response.get())) {
                System.err.println("Failed to set user data");
            }
        }
        
        System.out.println("Loaded " + users.size() + " users in bulk");
    }
    
    jedis.close();
}

Performance Comparison

public void performanceComparison() {
    Jedis jedis = new Jedis("localhost", 6379);
    int operations = 10000;
    
    // Without pipelining
    long startTime = System.currentTimeMillis();
    for (int i = 0; i < operations; i++) {
        jedis.set("key:" + i, "value:" + i);
    }
    long normalTime = System.currentTimeMillis() - startTime;
    
    // With pipelining
    startTime = System.currentTimeMillis();
    try (Pipeline pipeline = jedis.pipelined()) {
        for (int i = 0; i < operations; i++) {
            pipeline.set("pkey:" + i, "pvalue:" + i);
        }
        pipeline.sync();
    }
    long pipelineTime = System.currentTimeMillis() - startTime;
    
    System.out.println("Normal execution: " + normalTime + "ms");
    System.out.println("Pipeline execution: " + pipelineTime + "ms");
    System.out.println("Pipeline is " + (normalTime / (double) pipelineTime) + "x faster");
    
    jedis.close();
}

Response Handling

Response

Wrapper for deferred command results in pipelines and transactions.

public class Response<T> {
    /**
     * Get the result of the command
     * Only available after pipeline sync() or transaction exec()
     * @return Command result
     * @throws IllegalStateException if called before sync/exec
     */
    public T get();
    
    /**
     * Check if response is available
     * @return true if response has been set
     */
    public boolean isSet();
    
    /**
     * Get response as string representation
     * @return String representation of response
     */
    @Override
    public String toString();
}

Safe Response Handling

public void safeResponseHandling() {
    Jedis jedis = new Jedis("localhost", 6379);
    
    try (Pipeline pipeline = jedis.pipelined()) {
        Response<String> response1 = pipeline.get("key1");
        Response<String> response2 = pipeline.get("key2");
        Response<Long> response3 = pipeline.incr("counter");
        
        // Don't access responses before sync!
        // System.out.println(response1.get()); // This would throw IllegalStateException
        
        pipeline.sync();
        
        // Now safe to access responses
        if (response1.isSet()) {
            String value1 = response1.get();
            System.out.println("Key1: " + (value1 != null ? value1 : "null"));
        }
        
        if (response2.isSet()) {
            String value2 = response2.get();
            System.out.println("Key2: " + (value2 != null ? value2 : "null"));
        }
        
        if (response3.isSet()) {
            Long counter = response3.get();
            System.out.println("Counter: " + counter);
        }
    }
    
    jedis.close();
}

Advanced Patterns

Conditional Pipeline Execution

public class ConditionalPipeline {
    public void conditionalExecution(List<String> userIds) {
        Jedis jedis = new Jedis("localhost", 6379);
        
        try (Pipeline pipeline = jedis.pipelined()) {
            List<Response<String>> existChecks = new ArrayList<>();
            
            // First, check which users exist
            for (String userId : userIds) {
                existChecks.add(pipeline.get("user:" + userId));
            }
            
            pipeline.sync(); // Execute existence checks
            
            // Now conditionally add more commands based on results
            try (Pipeline secondPipeline = jedis.pipelined()) {
                for (int i = 0; i < userIds.size(); i++) {
                    String userId = userIds.get(i);
                    String userData = existChecks.get(i).get();
                    
                    if (userData != null) {
                        // User exists, update last seen
                        secondPipeline.hset("user:" + userId + ":meta", 
                                          "last_seen", String.valueOf(System.currentTimeMillis()));
                        secondPipeline.incr("user:" + userId + ":visits");
                    } else {
                        // User doesn't exist, create default data
                        secondPipeline.set("user:" + userId, "{}");
                        secondPipeline.hset("user:" + userId + ":meta", 
                                          "created", String.valueOf(System.currentTimeMillis()));
                    }
                }
                
                secondPipeline.sync();
            }
        }
        
        jedis.close();
    }
}

Pipeline with Error Handling

public class RobustPipelineProcessor {
    public void processWithErrorHandling(List<String> keys) {
        Jedis jedis = new Jedis("localhost", 6379);
        
        try (Pipeline pipeline = jedis.pipelined()) {
            List<Response<String>> responses = new ArrayList<>();
            
            // Queue all operations
            for (String key : keys) {
                responses.add(pipeline.get(key));
            }
            
            try {
                pipeline.sync();
                
                // Process results
                for (int i = 0; i < keys.size(); i++) {
                    String key = keys.get(i);
                    Response<String> response = responses.get(i);
                    
                    try {
                        String value = response.get();
                        if (value != null) {
                            processValue(key, value);
                        } else {
                            handleMissingKey(key);
                        }
                    } catch (Exception e) {
                        handleKeyError(key, e);
                    }
                }
                
            } catch (Exception e) {
                System.err.println("Pipeline execution failed: " + e.getMessage());
                // Could implement retry logic here
            }
        }
        
        jedis.close();
    }
    
    private void processValue(String key, String value) {
        System.out.println("Processing " + key + ": " + value);
    }
    
    private void handleMissingKey(String key) {
        System.out.println("Key not found: " + key);
    }
    
    private void handleKeyError(String key, Exception e) {
        System.err.println("Error processing key " + key + ": " + e.getMessage());
    }
}

Transaction vs Pipeline Decision Matrix

public class TransactionVsPipeline {
    
    // Use Transaction when:
    public void atomicOperations() {
        // 1. ACID properties required
        // 2. Operations must all succeed or all fail
        // 3. Using WATCH for optimistic locking
        // 4. Data consistency is critical
        
        Jedis jedis = new Jedis("localhost", 6379);
        jedis.watch("critical_counter");
        
        try (Transaction tx = jedis.multi()) {
            tx.incr("critical_counter");
            tx.set("last_updated", String.valueOf(System.currentTimeMillis()));
            
            List<Object> results = tx.exec();
            if (results == null) {
                // Handle conflict
            }
        }
        jedis.close();
    }
    
    // Use Pipeline when:
    public void performanceOptimization() {
        // 1. Bulk operations
        // 2. Network round-trip reduction
        // 3. High throughput required
        // 4. Individual command failures acceptable
        
        Jedis jedis = new Jedis("localhost", 6379);
        
        try (Pipeline pipeline = jedis.pipelined()) {
            for (int i = 0; i < 1000; i++) {
                pipeline.set("bulk:" + i, "data:" + i);
            }
            pipeline.sync();
        }
        jedis.close();
    }
}

Redis transactions and pipelining are essential features for building high-performance applications with Jedis. Transactions provide ACID guarantees and optimistic locking, while pipelining significantly improves throughput for bulk operations.

Install with Tessl CLI

npx tessl i tessl/maven-redis-clients--jedis

docs

authentication.md

client-side-caching.md

clustering.md

commands-operations.md

connection-management.md

core-clients.md

exceptions.md

index.md

modules.md

parameters.md

pubsub.md

transactions-pipelining.md

tile.json