Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Transactional messaging for exactly-once semantics, multi-topic atomic operations, and coordinated message processing across producers and consumers.
Core interface for transactional operations providing exactly-once message processing guarantees.
/**
* Transaction interface for atomic message operations
* Provides exactly-once semantics across multiple topics and operations
*/
interface Transaction {
/** Get transaction ID */
TxnID getTxnID();
/** Get transaction state */
TransactionState getState();
/** Commit transaction */
CompletableFuture<Void> commit();
/** Abort transaction */
CompletableFuture<Void> abort();
/** Get transaction timeout */
long getTransactionTimeout();
/** Get transaction start timestamp */
long getTransactionStartTime();
}Builder interface for creating and configuring transactions.
/**
* Builder for creating and configuring transactions
*/
interface TransactionBuilder {
/** Build the transaction asynchronously */
CompletableFuture<Transaction> build();
/** Set transaction timeout (default: coordinator configured timeout) */
TransactionBuilder withTransactionTimeout(long timeout, TimeUnit timeUnit);
}Basic Transaction Usage:
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.transaction.*;
// Enable transactions in client
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTransaction(true)
.build();
// Create transaction
Transaction txn = client.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES)
.build()
.get();
try {
// Produce messages in transaction
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("transactional-topic")
.create();
producer.newMessage(txn)
.value("Message 1")
.send();
producer.newMessage(txn)
.value("Message 2")
.send();
// Consume and acknowledge in transaction
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("input-topic")
.subscriptionName("txn-sub")
.subscribe();
Message<String> message = consumer.receive();
consumer.acknowledge(message.getMessageId(), txn);
// Commit transaction
txn.commit().get();
System.out.println("Transaction committed successfully");
} catch (Exception e) {
// Abort transaction on error
txn.abort().get();
System.err.println("Transaction aborted: " + e.getMessage());
}Producer operations within transaction context for atomic message publishing.
/**
* Extended TypedMessageBuilder for transactional operations
*/
interface TypedMessageBuilder<T> {
/** Send message within transaction context */
MessageId send(Transaction txn) throws PulsarClientException;
/** Send message within transaction context asynchronously */
CompletableFuture<MessageId> sendAsync(Transaction txn);
}
/**
* Producer interface with transaction support
*/
interface Producer<T> {
/** Create message builder for transaction */
TypedMessageBuilder<T> newMessage(Transaction txn);
/** Send message in transaction */
MessageId send(T message, Transaction txn) throws PulsarClientException;
/** Send message in transaction asynchronously */
CompletableFuture<MessageId> sendAsync(T message, Transaction txn);
}Transactional Producer Examples:
// Multi-topic atomic publishing
Transaction txn = client.newTransaction().build().get();
try {
Producer<String> orderProducer = client.newProducer(Schema.STRING)
.topic("orders")
.create();
Producer<String> inventoryProducer = client.newProducer(Schema.STRING)
.topic("inventory")
.create();
Producer<String> paymentProducer = client.newProducer(Schema.STRING)
.topic("payments")
.create();
// All messages sent atomically
orderProducer.send("order-123", txn);
inventoryProducer.send("reserve-item-456", txn);
paymentProducer.send("charge-user-789", txn);
txn.commit().get();
} catch (Exception e) {
txn.abort().get();
throw e;
}
// Conditional message publishing
Transaction txn = client.newTransaction().build().get();
try {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("conditional-topic")
.create();
// Business logic to determine if messages should be sent
if (shouldSendMessages()) {
for (String message : getMessagesToSend()) {
producer.send(message, txn);
}
txn.commit().get();
} else {
txn.abort().get();
}
} catch (Exception e) {
txn.abort().get();
}Consumer operations within transaction context for atomic message acknowledgment.
/**
* Consumer interface with transaction support
*/
interface Consumer<T> {
/** Acknowledge message within transaction */
void acknowledge(MessageId messageId, Transaction txn) throws PulsarClientException;
/** Acknowledge message within transaction */
void acknowledge(Message<?> message, Transaction txn) throws PulsarClientException;
/** Acknowledge message within transaction asynchronously */
CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction txn);
/** Acknowledge message within transaction asynchronously */
CompletableFuture<Void> acknowledgeAsync(Message<?> message, Transaction txn);
/** Acknowledge cumulatively within transaction */
void acknowledgeCumulative(MessageId messageId, Transaction txn) throws PulsarClientException;
/** Acknowledge cumulatively within transaction */
void acknowledgeCumulative(Message<?> message, Transaction txn) throws PulsarClientException;
/** Acknowledge cumulatively within transaction asynchronously */
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn);
/** Acknowledge cumulatively within transaction asynchronously */
CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message, Transaction txn);
}Transactional Consumer Examples:
// Exactly-once message processing
Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
.topic("input-topic")
.subscriptionName("processor-sub")
.subscribe();
Producer<String> outputProducer = client.newProducer(Schema.STRING)
.topic("output-topic")
.create();
while (true) {
Transaction txn = client.newTransaction().build().get();
try {
// Receive message
Message<String> inputMessage = inputConsumer.receive();
// Process message
String processedData = processMessage(inputMessage.getValue());
// Send processed result
outputProducer.send(processedData, txn);
// Acknowledge input message
inputConsumer.acknowledge(inputMessage, txn);
// Commit transaction
txn.commit().get();
} catch (Exception e) {
txn.abort().get();
System.err.println("Processing failed, transaction aborted: " + e.getMessage());
}
}
// Batch processing with transactions
List<Message<String>> messageBatch = new ArrayList<>();
Transaction txn = client.newTransaction().build().get();
try {
// Collect batch of messages
for (int i = 0; i < BATCH_SIZE; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message != null) {
messageBatch.add(message);
} else {
break; // No more messages available
}
}
// Process batch
for (Message<String> message : messageBatch) {
String result = processMessage(message.getValue());
outputProducer.send(result, txn);
consumer.acknowledge(message, txn);
}
txn.commit().get();
} catch (Exception e) {
txn.abort().get();
}Configuration for transaction isolation behavior.
/**
* Transaction isolation levels
*/
enum TransactionIsolationLevel {
/** Read committed isolation level */
READ_COMMITTED,
/** Read uncommitted isolation level */
READ_UNCOMMITTED
}
/**
* Consumer builder with transaction isolation configuration
*/
interface ConsumerBuilder<T> {
/** Set transaction isolation level */
ConsumerBuilder<T> transactionIsolationLevel(TransactionIsolationLevel isolationLevel);
}Isolation Level Examples:
// Consumer with read committed isolation (default)
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("transactional-topic")
.subscriptionName("committed-reader")
.transactionIsolationLevel(TransactionIsolationLevel.READ_COMMITTED)
.subscribe();
// Only reads messages from committed transactions
Message<String> message = consumer.receive();
// Consumer with read uncommitted isolation
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("transactional-topic")
.subscriptionName("uncommitted-reader")
.transactionIsolationLevel(TransactionIsolationLevel.READ_UNCOMMITTED)
.subscribe();
// Reads messages from both committed and uncommitted transactions
Message<String> message = consumer.receive();Monitoring and managing transaction states and lifecycle.
/**
* Transaction states
*/
enum TransactionState {
/** Transaction is open and active */
OPEN,
/** Transaction is committing */
COMMITTING,
/** Transaction is aborting */
ABORTING,
/** Transaction has been committed */
COMMITTED,
/** Transaction has been aborted */
ABORTED,
/** Transaction has timed out */
TIMEOUT,
/** Transaction is in error state */
ERROR
}
/**
* Transaction ID for tracking and debugging
*/
interface TxnID {
/** Get most significant bits */
long getMostSigBits();
/** Get least significant bits */
long getLeastSigBits();
/** Convert to string representation */
String toString();
}Transaction State Examples:
// Monitor transaction state
Transaction txn = client.newTransaction().build().get();
System.out.println("Transaction ID: " + txn.getTxnID());
System.out.println("Initial state: " + txn.getState());
System.out.println("Timeout: " + txn.getTransactionTimeout() + "ms");
try {
// Perform operations
producer.send("data", txn);
// Check state before commit
if (txn.getState() == TransactionState.OPEN) {
txn.commit().get();
System.out.println("Final state: " + txn.getState());
}
} catch (Exception e) {
System.out.println("Error state: " + txn.getState());
if (txn.getState() == TransactionState.OPEN) {
txn.abort().get();
}
}Comprehensive exception handling for transactional operations.
/**
* Transaction-related exceptions
*/
class PulsarClientException {
/** Transaction conflict detected */
static class TransactionConflictException extends PulsarClientException {
TransactionConflictException(String msg);
}
/** Transaction has failed operations */
static class TransactionHasOperationFailedException extends PulsarClientException {
TransactionHasOperationFailedException(String msg);
}
/** Transaction coordinator not available */
static class TransactionCoordinatorNotAvailableException extends PulsarClientException {
TransactionCoordinatorNotAvailableException(String msg);
}
/** Transaction not found */
static class TransactionNotFoundException extends PulsarClientException {
TransactionNotFoundException(String msg);
}
}Common patterns for using transactions in complex scenarios.
Pattern 1: Message Forwarding with Exactly-Once Semantics
public class ExactlyOnceForwarder {
private final Consumer<String> inputConsumer;
private final Producer<String> outputProducer;
private final PulsarClient client;
public void processMessages() {
while (!Thread.currentThread().isInterrupted()) {
Transaction txn = null;
try {
txn = client.newTransaction()
.withTransactionTimeout(30, TimeUnit.SECONDS)
.build()
.get();
Message<String> inputMessage = inputConsumer.receive(5, TimeUnit.SECONDS);
if (inputMessage == null) {
txn.abort().get();
continue;
}
// Transform message
String transformedData = transform(inputMessage.getValue());
// Send to output topic
outputProducer.send(transformedData, txn);
// Acknowledge input
inputConsumer.acknowledge(inputMessage, txn);
// Commit transaction
txn.commit().get();
} catch (Exception e) {
if (txn != null) {
try {
txn.abort().get();
} catch (Exception abortException) {
logger.error("Failed to abort transaction", abortException);
}
}
logger.error("Message processing failed", e);
}
}
}
}Pattern 2: Multi-Consumer Coordinated Processing
public class CoordinatedProcessor {
public void processCoordinatedMessages() {
Transaction txn = client.newTransaction().build().get();
try {
// Read from multiple input topics
Message<String> orderMessage = orderConsumer.receive(1, TimeUnit.SECONDS);
Message<String> inventoryMessage = inventoryConsumer.receive(1, TimeUnit.SECONDS);
if (orderMessage != null && inventoryMessage != null) {
// Process both messages together
String result = processOrder(orderMessage.getValue(), inventoryMessage.getValue());
// Send result
resultProducer.send(result, txn);
// Acknowledge both inputs
orderConsumer.acknowledge(orderMessage, txn);
inventoryConsumer.acknowledge(inventoryMessage, txn);
txn.commit().get();
} else {
txn.abort().get();
}
} catch (Exception e) {
txn.abort().get();
throw e;
}
}
}/**
* Transaction configuration best practices
*/
class TransactionConfig {
/** Recommended timeout for short operations */
static final Duration SHORT_TIMEOUT = Duration.ofSeconds(30);
/** Recommended timeout for long operations */
static final Duration LONG_TIMEOUT = Duration.ofMinutes(5);
/** Maximum recommended timeout */
static final Duration MAX_TIMEOUT = Duration.ofMinutes(10);
}Best Practices Examples:
// Configure client for optimal transaction performance
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTransaction(true)
.operationTimeout(60, TimeUnit.SECONDS) // Longer timeout for transactions
.build();
// Use appropriate transaction timeouts
Transaction shortTxn = client.newTransaction()
.withTransactionTimeout(30, TimeUnit.SECONDS) // For quick operations
.build().get();
Transaction longTxn = client.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES) // For batch processing
.build().get();
// Always handle transaction lifecycle properly
try (AutoCloseable txnResource = () -> {
if (txn.getState() == TransactionState.OPEN) {
txn.abort().get();
}
}) {
// Perform transaction operations
txn.commit().get();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client