Apache Pulsar Java client library for distributed pub-sub messaging platform
—
Core client creation, configuration, and lifecycle management for establishing and maintaining connections to Pulsar brokers.
Main entry point for creating PulsarClient instances using the builder pattern.
/**
* Main entry point for all Pulsar operations
* Thread-safe and can be reused for managing multiple producers, consumers, and readers
*/
interface PulsarClient extends Closeable {
/** Get a new builder instance for configuring and building a PulsarClient */
static ClientBuilder builder();
/** Create producer builder with default byte array schema */
ProducerBuilder<byte[]> newProducer();
/** Create producer builder with specified schema */
<T> ProducerBuilder<T> newProducer(Schema<T> schema);
/** Create consumer builder with default byte array schema */
ConsumerBuilder<byte[]> newConsumer();
/** Create consumer builder with specified schema */
<T> ConsumerBuilder<T> newConsumer(Schema<T> schema);
/** Create reader builder with default byte array schema */
ReaderBuilder<byte[]> newReader();
/** Create reader builder with specified schema */
<T> ReaderBuilder<T> newReader(Schema<T> schema);
/** Create table view builder with default byte array schema */
TableViewBuilder<byte[]> newTableView();
/** Create table view builder with specified schema */
<T> TableViewBuilder<T> newTableView(Schema<T> schema);
/** Create transaction builder */
TransactionBuilder newTransaction() throws PulsarClientException;
/** Get partition names for a topic */
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled);
/** Update the service URL this client is using */
void updateServiceUrl(String serviceUrl) throws PulsarClientException;
/** Close the client gracefully */
void close() throws PulsarClientException;
/** Close the client asynchronously */
CompletableFuture<Void> closeAsync();
/** Force shutdown the client immediately */
void shutdown() throws PulsarClientException;
/** Check if the client has been closed */
boolean isClosed();
}Usage Examples:
import org.apache.pulsar.client.api.*;
// Basic client creation
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Advanced client configuration
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://my-broker:6651")
.authentication(AuthenticationFactory.token("my-token"))
.operationTimeout(30, TimeUnit.SECONDS)
.ioThreads(4)
.connectionsPerBroker(1)
.build();
// Get topic partitions
List<String> partitions = client.getPartitionsForTopic("my-topic", true).get();
// Graceful shutdown
client.close();Builder interface for configuring PulsarClient instances with extensive customization options.
/**
* Builder interface for configuring and constructing PulsarClient instances
*/
interface ClientBuilder extends Serializable, Cloneable {
/** Construct the final PulsarClient instance */
PulsarClient build() throws PulsarClientException;
/** Load configuration from a map */
ClientBuilder loadConf(Map<String, Object> config);
/** Create a copy of the current client builder */
ClientBuilder clone();
/** Configure the service URL for the Pulsar service (required) */
ClientBuilder serviceUrl(String serviceUrl);
/** Configure the service URL provider for dynamic URLs */
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
/** Configure listener name for advertised listener */
ClientBuilder listenerName(String name);
/** Set authentication provider */
ClientBuilder authentication(Authentication authentication);
/** Set authentication using plugin class name and parameters */
ClientBuilder authentication(String authPluginClassName, String authParamsString) throws UnsupportedAuthenticationException;
/** Set authentication using plugin class name and parameter map */
ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;
/** Set operation timeout (default: 30 seconds) */
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
/** Set lookup timeout (default: matches operation timeout) */
ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit);
/** Set number of IO threads (default: available processors) */
ClientBuilder ioThreads(int numIoThreads);
/** Set number of listener threads (default: available processors) */
ClientBuilder listenerThreads(int numListenerThreads);
/** Set max connections per broker (default: 1) */
ClientBuilder connectionsPerBroker(int connectionsPerBroker);
/** Configure TCP no-delay flag (default: true) */
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);
/** Set connection max idle time in seconds (default: 25) */
ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
/** Set keep alive interval (default: 30 seconds) */
ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit);
/** Set connection timeout */
ClientBuilder connectionTimeout(int duration, TimeUnit unit);
/** Set starting backoff interval */
ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);
/** Set maximum backoff interval */
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
/** Set memory limit (default: 64 MB) */
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
/** Set max concurrent lookup requests (default: 5000) */
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
/** Set max lookup requests (default: 50000) */
ClientBuilder maxLookupRequests(int maxLookupRequests);
/** Set max lookup redirects */
ClientBuilder maxLookupRedirects(int maxLookupRedirects);
/** Set max rejected requests per connection (default: 50) */
ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection);
/** Enable busy-wait settings for low latency (default: false) */
ClientBuilder enableBusyWait(boolean enableBusyWait);
/** Configure OpenTelemetry for metrics */
ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
/** Set clock for timestamps */
ClientBuilder clock(Clock clock);
/** Enable transaction support */
ClientBuilder enableTransaction(boolean enableTransaction);
}TLS configuration methods for secure connections.
interface ClientBuilder {
/** Set path to TLS key file */
ClientBuilder tlsKeyFilePath(String tlsKeyFilePath);
/** Set path to TLS certificate file */
ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath);
/** Set path to trusted TLS certificate file */
ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);
/** Allow untrusted TLS connections (default: false) */
ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection);
/** Enable TLS hostname verification */
ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
/** Use KeyStore type for TLS configuration */
ClientBuilder useKeyStoreTls(boolean useKeyStoreTls);
/** Set security provider for SSL connections */
ClientBuilder sslProvider(String sslProvider);
/** Set key store type */
ClientBuilder tlsKeyStoreType(String tlsKeyStoreType);
/** Set key store path */
ClientBuilder tlsKeyStorePath(String tlsKeyStorePath);
/** Set key store password */
ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword);
/** Set trust store type */
ClientBuilder tlsTrustStoreType(String tlsTrustStoreType);
/** Set trust store path */
ClientBuilder tlsTrustStorePath(String tlsTrustStorePath);
/** Set trust store password */
ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
/** Set allowed cipher suites */
ClientBuilder tlsCiphers(Set<String> tlsCiphers);
/** Set allowed TLS protocols */
ClientBuilder tlsProtocols(Set<String> tlsProtocols);
}Network and proxy configuration methods.
interface ClientBuilder {
/** Set proxy service URL and protocol */
ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
/** Set DNS lookup bind address and port */
ClientBuilder dnsLookupBind(String address, int port);
/** Set DNS server addresses */
ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses);
/** Set SOCKS5 proxy address */
ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress);
/** Set SOCKS5 proxy username */
ClientBuilder socks5ProxyUsername(String socks5ProxyUsername);
/** Set SOCKS5 proxy password */
ClientBuilder socks5ProxyPassword(String socks5ProxyPassword);
/** Set properties used for topic lookup */
ClientBuilder lookupProperties(Map<String, String> properties);
}Advanced Configuration Examples:
// TLS configuration
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar+ssl://broker:6651")
.tlsCertificateFilePath("/path/to/client.cert.pem")
.tlsKeyFilePath("/path/to/client.key.pem")
.tlsTrustCertsFilePath("/path/to/ca.cert.pem")
.enableTlsHostnameVerification(true)
.build();
// Connection pooling and timeouts
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://broker:6650")
.operationTimeout(60, TimeUnit.SECONDS)
.connectionTimeout(10, TimeUnit.SECONDS)
.connectionsPerBroker(3)
.keepAliveInterval(30, TimeUnit.SECONDS)
.build();
// Proxy configuration
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://broker:6650")
.proxyServiceUrl("http://proxy:8080", ProxyProtocol.SNI)
.build();interface ServiceUrlProvider {
void initialize(PulsarClient client);
String getServiceUrl();
}
enum ProxyProtocol {
SNI
}
enum SizeUnit {
BYTES,
KILOBYTES,
MEGABYTES,
GIGABYTES,
TERABYTES
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--pulsar-client