CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--pulsar-client

Apache Pulsar Java client library for distributed pub-sub messaging platform

Pending
Overview
Eval results
Files

client-management.mddocs/

Client Management

Core client creation, configuration, and lifecycle management for establishing and maintaining connections to Pulsar brokers.

Capabilities

PulsarClient Factory

Main entry point for creating PulsarClient instances using the builder pattern.

/**
 * Main entry point for all Pulsar operations
 * Thread-safe and can be reused for managing multiple producers, consumers, and readers
 */
interface PulsarClient extends Closeable {
    /** Get a new builder instance for configuring and building a PulsarClient */
    static ClientBuilder builder();
    
    /** Create producer builder with default byte array schema */
    ProducerBuilder<byte[]> newProducer();
    
    /** Create producer builder with specified schema */
    <T> ProducerBuilder<T> newProducer(Schema<T> schema);
    
    /** Create consumer builder with default byte array schema */
    ConsumerBuilder<byte[]> newConsumer();
    
    /** Create consumer builder with specified schema */
    <T> ConsumerBuilder<T> newConsumer(Schema<T> schema);
    
    /** Create reader builder with default byte array schema */
    ReaderBuilder<byte[]> newReader();
    
    /** Create reader builder with specified schema */
    <T> ReaderBuilder<T> newReader(Schema<T> schema);
    
    /** Create table view builder with default byte array schema */
    TableViewBuilder<byte[]> newTableView();
    
    /** Create table view builder with specified schema */
    <T> TableViewBuilder<T> newTableView(Schema<T> schema);
    
    /** Create transaction builder */
    TransactionBuilder newTransaction() throws PulsarClientException;
    
    /** Get partition names for a topic */
    CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled);
    
    /** Update the service URL this client is using */
    void updateServiceUrl(String serviceUrl) throws PulsarClientException;
    
    /** Close the client gracefully */
    void close() throws PulsarClientException;
    
    /** Close the client asynchronously */
    CompletableFuture<Void> closeAsync();
    
    /** Force shutdown the client immediately */
    void shutdown() throws PulsarClientException;
    
    /** Check if the client has been closed */
    boolean isClosed();
}

Usage Examples:

import org.apache.pulsar.client.api.*;

// Basic client creation
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

// Advanced client configuration
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://my-broker:6651")
    .authentication(AuthenticationFactory.token("my-token"))
    .operationTimeout(30, TimeUnit.SECONDS)
    .ioThreads(4)
    .connectionsPerBroker(1)
    .build();

// Get topic partitions
List<String> partitions = client.getPartitionsForTopic("my-topic", true).get();

// Graceful shutdown
client.close();

ClientBuilder Configuration

Builder interface for configuring PulsarClient instances with extensive customization options.

/**
 * Builder interface for configuring and constructing PulsarClient instances
 */
interface ClientBuilder extends Serializable, Cloneable {
    /** Construct the final PulsarClient instance */
    PulsarClient build() throws PulsarClientException;
    
    /** Load configuration from a map */
    ClientBuilder loadConf(Map<String, Object> config);
    
    /** Create a copy of the current client builder */
    ClientBuilder clone();
    
    /** Configure the service URL for the Pulsar service (required) */
    ClientBuilder serviceUrl(String serviceUrl);
    
    /** Configure the service URL provider for dynamic URLs */
    ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
    
    /** Configure listener name for advertised listener */
    ClientBuilder listenerName(String name);
    
    /** Set authentication provider */
    ClientBuilder authentication(Authentication authentication);
    
    /** Set authentication using plugin class name and parameters */
    ClientBuilder authentication(String authPluginClassName, String authParamsString) throws UnsupportedAuthenticationException;
    
    /** Set authentication using plugin class name and parameter map */
    ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;
    
    /** Set operation timeout (default: 30 seconds) */
    ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
    
    /** Set lookup timeout (default: matches operation timeout) */
    ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit);
    
    /** Set number of IO threads (default: available processors) */
    ClientBuilder ioThreads(int numIoThreads);
    
    /** Set number of listener threads (default: available processors) */
    ClientBuilder listenerThreads(int numListenerThreads);
    
    /** Set max connections per broker (default: 1) */
    ClientBuilder connectionsPerBroker(int connectionsPerBroker);
    
    /** Configure TCP no-delay flag (default: true) */
    ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);
    
    /** Set connection max idle time in seconds (default: 25) */
    ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
    
    /** Set keep alive interval (default: 30 seconds) */
    ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit);
    
    /** Set connection timeout */
    ClientBuilder connectionTimeout(int duration, TimeUnit unit);
    
    /** Set starting backoff interval */
    ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);
    
    /** Set maximum backoff interval */
    ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
    
    /** Set memory limit (default: 64 MB) */
    ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
    
    /** Set max concurrent lookup requests (default: 5000) */
    ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
    
    /** Set max lookup requests (default: 50000) */
    ClientBuilder maxLookupRequests(int maxLookupRequests);
    
    /** Set max lookup redirects */
    ClientBuilder maxLookupRedirects(int maxLookupRedirects);
    
    /** Set max rejected requests per connection (default: 50) */
    ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection);
    
    /** Enable busy-wait settings for low latency (default: false) */
    ClientBuilder enableBusyWait(boolean enableBusyWait);
    
    /** Configure OpenTelemetry for metrics */
    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
    
    /** Set clock for timestamps */
    ClientBuilder clock(Clock clock);
    
    /** Enable transaction support */
    ClientBuilder enableTransaction(boolean enableTransaction);
}

TLS and Security Configuration

TLS configuration methods for secure connections.

interface ClientBuilder {
    /** Set path to TLS key file */
    ClientBuilder tlsKeyFilePath(String tlsKeyFilePath);
    
    /** Set path to TLS certificate file */
    ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath);
    
    /** Set path to trusted TLS certificate file */
    ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);
    
    /** Allow untrusted TLS connections (default: false) */
    ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection);
    
    /** Enable TLS hostname verification */
    ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
    
    /** Use KeyStore type for TLS configuration */
    ClientBuilder useKeyStoreTls(boolean useKeyStoreTls);
    
    /** Set security provider for SSL connections */
    ClientBuilder sslProvider(String sslProvider);
    
    /** Set key store type */
    ClientBuilder tlsKeyStoreType(String tlsKeyStoreType);
    
    /** Set key store path */
    ClientBuilder tlsKeyStorePath(String tlsKeyStorePath);
    
    /** Set key store password */
    ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword);
    
    /** Set trust store type */
    ClientBuilder tlsTrustStoreType(String tlsTrustStoreType);
    
    /** Set trust store path */
    ClientBuilder tlsTrustStorePath(String tlsTrustStorePath);
    
    /** Set trust store password */
    ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
    
    /** Set allowed cipher suites */
    ClientBuilder tlsCiphers(Set<String> tlsCiphers);
    
    /** Set allowed TLS protocols */
    ClientBuilder tlsProtocols(Set<String> tlsProtocols);
}

Proxy and Network Configuration

Network and proxy configuration methods.

interface ClientBuilder {
    /** Set proxy service URL and protocol */
    ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
    
    /** Set DNS lookup bind address and port */
    ClientBuilder dnsLookupBind(String address, int port);
    
    /** Set DNS server addresses */
    ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses);
    
    /** Set SOCKS5 proxy address */
    ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress);
    
    /** Set SOCKS5 proxy username */
    ClientBuilder socks5ProxyUsername(String socks5ProxyUsername);
    
    /** Set SOCKS5 proxy password */
    ClientBuilder socks5ProxyPassword(String socks5ProxyPassword);
    
    /** Set properties used for topic lookup */
    ClientBuilder lookupProperties(Map<String, String> properties);
}

Advanced Configuration Examples:

// TLS configuration
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar+ssl://broker:6651")
    .tlsCertificateFilePath("/path/to/client.cert.pem")
    .tlsKeyFilePath("/path/to/client.key.pem")
    .tlsTrustCertsFilePath("/path/to/ca.cert.pem")
    .enableTlsHostnameVerification(true)
    .build();

// Connection pooling and timeouts
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://broker:6650")
    .operationTimeout(60, TimeUnit.SECONDS)
    .connectionTimeout(10, TimeUnit.SECONDS)
    .connectionsPerBroker(3)
    .keepAliveInterval(30, TimeUnit.SECONDS)
    .build();

// Proxy configuration
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://broker:6650")
    .proxyServiceUrl("http://proxy:8080", ProxyProtocol.SNI)
    .build();

Supporting Types

interface ServiceUrlProvider {
    void initialize(PulsarClient client);
    String getServiceUrl();
}

enum ProxyProtocol {
    SNI
}

enum SizeUnit {
    BYTES,
    KILOBYTES, 
    MEGABYTES,
    GIGABYTES,
    TERABYTES
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--pulsar-client

docs

authentication-security.md

client-management.md

index.md

message-consumption.md

message-production.md

message-reading.md

schema-serialization.md

transaction-support.md

tile.json