CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

transaction-support.mddocs/

Transaction Support

Transactional messaging for exactly-once semantics, multi-topic atomic operations, and coordinated message processing across producers and consumers.

Capabilities

Transaction Interface

Core interface for transactional operations providing exactly-once message processing guarantees.

/**
 * Transaction interface for atomic message operations
 * Provides exactly-once semantics across multiple topics and operations
 */
interface Transaction {
    /** Get transaction ID */
    TxnID getTxnID();
    
    /** Get transaction state */
    TransactionState getState();
    
    /** Commit transaction */
    CompletableFuture<Void> commit();
    
    /** Abort transaction */
    CompletableFuture<Void> abort();
    
    /** Get transaction timeout */
    long getTransactionTimeout();
    
    /** Get transaction start timestamp */
    long getTransactionStartTime();
}

TransactionBuilder Configuration

Builder interface for creating and configuring transactions.

/**
 * Builder for creating and configuring transactions
 */
interface TransactionBuilder {
    /** Build the transaction asynchronously */
    CompletableFuture<Transaction> build();
    
    /** Set transaction timeout (default: coordinator configured timeout) */
    TransactionBuilder withTransactionTimeout(long timeout, TimeUnit timeUnit);
}

Basic Transaction Usage:

import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.transaction.*;

// Enable transactions in client
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .enableTransaction(true)
    .build();

// Create transaction
Transaction txn = client.newTransaction()
    .withTransactionTimeout(1, TimeUnit.MINUTES)
    .build()
    .get();

try {
    // Produce messages in transaction
    Producer<String> producer = client.newProducer(Schema.STRING)
        .topic("transactional-topic")
        .create();
    
    producer.newMessage(txn)
        .value("Message 1")
        .send();
    
    producer.newMessage(txn)
        .value("Message 2") 
        .send();
    
    // Consume and acknowledge in transaction
    Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("input-topic")
        .subscriptionName("txn-sub")
        .subscribe();
    
    Message<String> message = consumer.receive();
    consumer.acknowledge(message.getMessageId(), txn);
    
    // Commit transaction
    txn.commit().get();
    System.out.println("Transaction committed successfully");
    
} catch (Exception e) {
    // Abort transaction on error
    txn.abort().get();
    System.err.println("Transaction aborted: " + e.getMessage());
}

Transactional Producer Operations

Producer operations within transaction context for atomic message publishing.

/**
 * Extended TypedMessageBuilder for transactional operations
 */
interface TypedMessageBuilder<T> {
    /** Send message within transaction context */
    MessageId send(Transaction txn) throws PulsarClientException;
    
    /** Send message within transaction context asynchronously */
    CompletableFuture<MessageId> sendAsync(Transaction txn);
}

/**
 * Producer interface with transaction support
 */
interface Producer<T> {
    /** Create message builder for transaction */
    TypedMessageBuilder<T> newMessage(Transaction txn);
    
    /** Send message in transaction */
    MessageId send(T message, Transaction txn) throws PulsarClientException;
    
    /** Send message in transaction asynchronously */
    CompletableFuture<MessageId> sendAsync(T message, Transaction txn);
}

Transactional Producer Examples:

// Multi-topic atomic publishing
Transaction txn = client.newTransaction().build().get();

try {
    Producer<String> orderProducer = client.newProducer(Schema.STRING)
        .topic("orders")
        .create();
    
    Producer<String> inventoryProducer = client.newProducer(Schema.STRING)
        .topic("inventory")
        .create();
    
    Producer<String> paymentProducer = client.newProducer(Schema.STRING)
        .topic("payments")
        .create();
    
    // All messages sent atomically
    orderProducer.send("order-123", txn);
    inventoryProducer.send("reserve-item-456", txn);
    paymentProducer.send("charge-user-789", txn);
    
    txn.commit().get();
    
} catch (Exception e) {
    txn.abort().get();
    throw e;
}

// Conditional message publishing
Transaction txn = client.newTransaction().build().get();

try {
    Producer<String> producer = client.newProducer(Schema.STRING)
        .topic("conditional-topic")
        .create();
    
    // Business logic to determine if messages should be sent
    if (shouldSendMessages()) {
        for (String message : getMessagesToSend()) {
            producer.send(message, txn);
        }
        txn.commit().get();
    } else {
        txn.abort().get();
    }
} catch (Exception e) {
    txn.abort().get();
}

Transactional Consumer Operations

Consumer operations within transaction context for atomic message acknowledgment.

/**
 * Consumer interface with transaction support
 */
interface Consumer<T> {
    /** Acknowledge message within transaction */
    void acknowledge(MessageId messageId, Transaction txn) throws PulsarClientException;
    
    /** Acknowledge message within transaction */
    void acknowledge(Message<?> message, Transaction txn) throws PulsarClientException;
    
    /** Acknowledge message within transaction asynchronously */
    CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn);
    
    /** Acknowledge message within transaction asynchronously */
    CompletableFuture<Void> acknowledgeAsync(Message<?> message, Transaction txn);
    
    /** Acknowledge cumulatively within transaction */
    void acknowledgeCumulative(MessageId messageId, Transaction txn) throws PulsarClientException;
    
    /** Acknowledge cumulatively within transaction */
    void acknowledgeCumulative(Message<?> message, Transaction txn) throws PulsarClientException;
    
    /** Acknowledge cumulatively within transaction asynchronously */
    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn);
    
    /** Acknowledge cumulatively within transaction asynchronously */
    CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message, Transaction txn);
}

Transactional Consumer Examples:

// Exactly-once message processing
Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
    .topic("input-topic")
    .subscriptionName("processor-sub")
    .subscribe();

Producer<String> outputProducer = client.newProducer(Schema.STRING)
    .topic("output-topic")
    .create();

while (true) {
    Transaction txn = client.newTransaction().build().get();
    
    try {
        // Receive message
        Message<String> inputMessage = inputConsumer.receive();
        
        // Process message
        String processedData = processMessage(inputMessage.getValue());
        
        // Send processed result
        outputProducer.send(processedData, txn);
        
        // Acknowledge input message
        inputConsumer.acknowledge(inputMessage, txn);
        
        // Commit transaction
        txn.commit().get();
        
    } catch (Exception e) {
        txn.abort().get();
        System.err.println("Processing failed, transaction aborted: " + e.getMessage());
    }
}

// Batch processing with transactions
List<Message<String>> messageBatch = new ArrayList<>();
Transaction txn = client.newTransaction().build().get();

try {
    // Collect batch of messages
    for (int i = 0; i < BATCH_SIZE; i++) {
        Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
        if (message != null) {
            messageBatch.add(message);
        } else {
            break; // No more messages available
        }
    }
    
    // Process batch
    for (Message<String> message : messageBatch) {
        String result = processMessage(message.getValue());
        outputProducer.send(result, txn);
        consumer.acknowledge(message, txn);
    }
    
    txn.commit().get();
    
} catch (Exception e) {
    txn.abort().get();
}

Transaction Isolation Levels

Configuration for transaction isolation behavior.

/**
 * Transaction isolation levels
 */
enum TransactionIsolationLevel {
    /** Read committed isolation level */
    READ_COMMITTED,
    /** Read uncommitted isolation level */
    READ_UNCOMMITTED
}

/**
 * Consumer builder with transaction isolation configuration
 */
interface ConsumerBuilder<T> {
    /** Set transaction isolation level */
    ConsumerBuilder<T> transactionIsolationLevel(TransactionIsolationLevel isolationLevel);
}

Isolation Level Examples:

// Consumer with read committed isolation (default)
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("transactional-topic")
    .subscriptionName("committed-reader")
    .transactionIsolationLevel(TransactionIsolationLevel.READ_COMMITTED)
    .subscribe();

// Only reads messages from committed transactions
Message<String> message = consumer.receive();

// Consumer with read uncommitted isolation
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("transactional-topic")
    .subscriptionName("uncommitted-reader")
    .transactionIsolationLevel(TransactionIsolationLevel.READ_UNCOMMITTED)
    .subscribe();

// Reads messages from both committed and uncommitted transactions
Message<String> message = consumer.receive();

Transaction State Management

Monitoring and managing transaction states and lifecycle.

/**
 * Transaction states
 */
enum TransactionState {
    /** Transaction is open and active */
    OPEN,
    /** Transaction is committing */
    COMMITTING,
    /** Transaction is aborting */
    ABORTING,
    /** Transaction has been committed */
    COMMITTED,
    /** Transaction has been aborted */
    ABORTED,
    /** Transaction has timed out */
    TIMEOUT,
    /** Transaction is in error state */
    ERROR
}

/**
 * Transaction ID for tracking and debugging
 */
interface TxnID {
    /** Get most significant bits */
    long getMostSigBits();
    
    /** Get least significant bits */
    long getLeastSigBits();
    
    /** Convert to string representation */
    String toString();
}

Transaction State Examples:

// Monitor transaction state
Transaction txn = client.newTransaction().build().get();

System.out.println("Transaction ID: " + txn.getTxnID());
System.out.println("Initial state: " + txn.getState());
System.out.println("Timeout: " + txn.getTransactionTimeout() + "ms");

try {
    // Perform operations
    producer.send("data", txn);
    
    // Check state before commit
    if (txn.getState() == TransactionState.OPEN) {
        txn.commit().get();
        System.out.println("Final state: " + txn.getState());
    }
    
} catch (Exception e) {
    System.out.println("Error state: " + txn.getState());
    if (txn.getState() == TransactionState.OPEN) {
        txn.abort().get();
    }
}

Transaction Exception Handling

Comprehensive exception handling for transactional operations.

/**
 * Transaction-related exceptions
 */
class PulsarClientException {
    /** Transaction conflict detected */
    static class TransactionConflictException extends PulsarClientException {
        TransactionConflictException(String msg);
    }
    
    /** Transaction has failed operations */
    static class TransactionHasOperationFailedException extends PulsarClientException {
        TransactionHasOperationFailedException(String msg);
    }
    
    /** Transaction coordinator not available */
    static class TransactionCoordinatorNotAvailableException extends PulsarClientException {
        TransactionCoordinatorNotAvailableException(String msg);
    }
    
    /** Transaction not found */
    static class TransactionNotFoundException extends PulsarClientException {
        TransactionNotFoundException(String msg);
    }
}

Advanced Transaction Patterns

Common patterns for using transactions in complex scenarios.

Pattern 1: Message Forwarding with Exactly-Once Semantics

public class ExactlyOnceForwarder {
    private final Consumer<String> inputConsumer;
    private final Producer<String> outputProducer;
    private final PulsarClient client;
    
    public void processMessages() {
        while (!Thread.currentThread().isInterrupted()) {
            Transaction txn = null;
            try {
                txn = client.newTransaction()
                    .withTransactionTimeout(30, TimeUnit.SECONDS)
                    .build()
                    .get();
                
                Message<String> inputMessage = inputConsumer.receive(5, TimeUnit.SECONDS);
                if (inputMessage == null) {
                    txn.abort().get();
                    continue;
                }
                
                // Transform message
                String transformedData = transform(inputMessage.getValue());
                
                // Send to output topic
                outputProducer.send(transformedData, txn);
                
                // Acknowledge input
                inputConsumer.acknowledge(inputMessage, txn);
                
                // Commit transaction
                txn.commit().get();
                
            } catch (Exception e) {
                if (txn != null) {
                    try {
                        txn.abort().get();
                    } catch (Exception abortException) {
                        logger.error("Failed to abort transaction", abortException);
                    }
                }
                logger.error("Message processing failed", e);
            }
        }
    }
}

Pattern 2: Multi-Consumer Coordinated Processing

public class CoordinatedProcessor {
    public void processCoordinatedMessages() {
        Transaction txn = client.newTransaction().build().get();
        
        try {
            // Read from multiple input topics
            Message<String> orderMessage = orderConsumer.receive(1, TimeUnit.SECONDS);
            Message<String> inventoryMessage = inventoryConsumer.receive(1, TimeUnit.SECONDS);
            
            if (orderMessage != null && inventoryMessage != null) {
                // Process both messages together
                String result = processOrder(orderMessage.getValue(), inventoryMessage.getValue());
                
                // Send result
                resultProducer.send(result, txn);
                
                // Acknowledge both inputs
                orderConsumer.acknowledge(orderMessage, txn);
                inventoryConsumer.acknowledge(inventoryMessage, txn);
                
                txn.commit().get();
            } else {
                txn.abort().get();
            }
            
        } catch (Exception e) {
            txn.abort().get();
            throw e;
        }
    }
}

Configuration and Best Practices

/**
 * Transaction configuration best practices
 */
class TransactionConfig {
    /** Recommended timeout for short operations */
    static final Duration SHORT_TIMEOUT = Duration.ofSeconds(30);
    
    /** Recommended timeout for long operations */
    static final Duration LONG_TIMEOUT = Duration.ofMinutes(5);
    
    /** Maximum recommended timeout */
    static final Duration MAX_TIMEOUT = Duration.ofMinutes(10);
}

Best Practices Examples:

// Configure client for optimal transaction performance
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .enableTransaction(true)
    .operationTimeout(60, TimeUnit.SECONDS) // Longer timeout for transactions
    .build();

// Use appropriate transaction timeouts
Transaction shortTxn = client.newTransaction()
    .withTransactionTimeout(30, TimeUnit.SECONDS) // For quick operations
    .build().get();

Transaction longTxn = client.newTransaction()
    .withTransactionTimeout(5, TimeUnit.MINUTES) // For batch processing
    .build().get();

// Always handle transaction lifecycle properly
try (AutoCloseable txnResource = () -> {
    if (txn.getState() == TransactionState.OPEN) {
        txn.abort().get();
    }
}) {
    // Perform transaction operations
    txn.commit().get();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json