0
# Client Management
1
2
Core client creation, configuration, and lifecycle management for establishing and maintaining connections to Pulsar brokers.
3
4
## Capabilities
5
6
### PulsarClient Factory
7
8
Main entry point for creating PulsarClient instances using the builder pattern.
9
10
```java { .api }
11
/**
12
* Main entry point for all Pulsar operations
13
* Thread-safe and can be reused for managing multiple producers, consumers, and readers
14
*/
15
interface PulsarClient extends Closeable {
16
/** Get a new builder instance for configuring and building a PulsarClient */
17
static ClientBuilder builder();
18
19
/** Create producer builder with default byte array schema */
20
ProducerBuilder<byte[]> newProducer();
21
22
/** Create producer builder with specified schema */
23
<T> ProducerBuilder<T> newProducer(Schema<T> schema);
24
25
/** Create consumer builder with default byte array schema */
26
ConsumerBuilder<byte[]> newConsumer();
27
28
/** Create consumer builder with specified schema */
29
<T> ConsumerBuilder<T> newConsumer(Schema<T> schema);
30
31
/** Create reader builder with default byte array schema */
32
ReaderBuilder<byte[]> newReader();
33
34
/** Create reader builder with specified schema */
35
<T> ReaderBuilder<T> newReader(Schema<T> schema);
36
37
/** Create table view builder with default byte array schema */
38
TableViewBuilder<byte[]> newTableView();
39
40
/** Create table view builder with specified schema */
41
<T> TableViewBuilder<T> newTableView(Schema<T> schema);
42
43
/** Create transaction builder */
44
TransactionBuilder newTransaction() throws PulsarClientException;
45
46
/** Get partition names for a topic */
47
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled);
48
49
/** Update the service URL this client is using */
50
void updateServiceUrl(String serviceUrl) throws PulsarClientException;
51
52
/** Close the client gracefully */
53
void close() throws PulsarClientException;
54
55
/** Close the client asynchronously */
56
CompletableFuture<Void> closeAsync();
57
58
/** Force shutdown the client immediately */
59
void shutdown() throws PulsarClientException;
60
61
/** Check if the client has been closed */
62
boolean isClosed();
63
}
64
```
65
66
**Usage Examples:**
67
68
```java
69
import org.apache.pulsar.client.api.*;
70
71
// Basic client creation
72
PulsarClient client = PulsarClient.builder()
73
.serviceUrl("pulsar://localhost:6650")
74
.build();
75
76
// Advanced client configuration
77
PulsarClient client = PulsarClient.builder()
78
.serviceUrl("pulsar+ssl://my-broker:6651")
79
.authentication(AuthenticationFactory.token("my-token"))
80
.operationTimeout(30, TimeUnit.SECONDS)
81
.ioThreads(4)
82
.connectionsPerBroker(1)
83
.build();
84
85
// Get topic partitions
86
List<String> partitions = client.getPartitionsForTopic("my-topic", true).get();
87
88
// Graceful shutdown
89
client.close();
90
```
91
92
### ClientBuilder Configuration
93
94
Builder interface for configuring PulsarClient instances with extensive customization options.
95
96
```java { .api }
97
/**
98
* Builder interface for configuring and constructing PulsarClient instances
99
*/
100
interface ClientBuilder extends Serializable, Cloneable {
101
/** Construct the final PulsarClient instance */
102
PulsarClient build() throws PulsarClientException;
103
104
/** Load configuration from a map */
105
ClientBuilder loadConf(Map<String, Object> config);
106
107
/** Create a copy of the current client builder */
108
ClientBuilder clone();
109
110
/** Configure the service URL for the Pulsar service (required) */
111
ClientBuilder serviceUrl(String serviceUrl);
112
113
/** Configure the service URL provider for dynamic URLs */
114
ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);
115
116
/** Configure listener name for advertised listener */
117
ClientBuilder listenerName(String name);
118
119
/** Set authentication provider */
120
ClientBuilder authentication(Authentication authentication);
121
122
/** Set authentication using plugin class name and parameters */
123
ClientBuilder authentication(String authPluginClassName, String authParamsString) throws UnsupportedAuthenticationException;
124
125
/** Set authentication using plugin class name and parameter map */
126
ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;
127
128
/** Set operation timeout (default: 30 seconds) */
129
ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);
130
131
/** Set lookup timeout (default: matches operation timeout) */
132
ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit);
133
134
/** Set number of IO threads (default: available processors) */
135
ClientBuilder ioThreads(int numIoThreads);
136
137
/** Set number of listener threads (default: available processors) */
138
ClientBuilder listenerThreads(int numListenerThreads);
139
140
/** Set max connections per broker (default: 1) */
141
ClientBuilder connectionsPerBroker(int connectionsPerBroker);
142
143
/** Configure TCP no-delay flag (default: true) */
144
ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);
145
146
/** Set connection max idle time in seconds (default: 25) */
147
ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);
148
149
/** Set keep alive interval (default: 30 seconds) */
150
ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit);
151
152
/** Set connection timeout */
153
ClientBuilder connectionTimeout(int duration, TimeUnit unit);
154
155
/** Set starting backoff interval */
156
ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);
157
158
/** Set maximum backoff interval */
159
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
160
161
/** Set memory limit (default: 64 MB) */
162
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
163
164
/** Set max concurrent lookup requests (default: 5000) */
165
ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);
166
167
/** Set max lookup requests (default: 50000) */
168
ClientBuilder maxLookupRequests(int maxLookupRequests);
169
170
/** Set max lookup redirects */
171
ClientBuilder maxLookupRedirects(int maxLookupRedirects);
172
173
/** Set max rejected requests per connection (default: 50) */
174
ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection);
175
176
/** Enable busy-wait settings for low latency (default: false) */
177
ClientBuilder enableBusyWait(boolean enableBusyWait);
178
179
/** Configure OpenTelemetry for metrics */
180
ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
181
182
/** Set clock for timestamps */
183
ClientBuilder clock(Clock clock);
184
185
/** Enable transaction support */
186
ClientBuilder enableTransaction(boolean enableTransaction);
187
}
188
```
189
190
### TLS and Security Configuration
191
192
TLS configuration methods for secure connections.
193
194
```java { .api }
195
interface ClientBuilder {
196
/** Set path to TLS key file */
197
ClientBuilder tlsKeyFilePath(String tlsKeyFilePath);
198
199
/** Set path to TLS certificate file */
200
ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath);
201
202
/** Set path to trusted TLS certificate file */
203
ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);
204
205
/** Allow untrusted TLS connections (default: false) */
206
ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection);
207
208
/** Enable TLS hostname verification */
209
ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);
210
211
/** Use KeyStore type for TLS configuration */
212
ClientBuilder useKeyStoreTls(boolean useKeyStoreTls);
213
214
/** Set security provider for SSL connections */
215
ClientBuilder sslProvider(String sslProvider);
216
217
/** Set key store type */
218
ClientBuilder tlsKeyStoreType(String tlsKeyStoreType);
219
220
/** Set key store path */
221
ClientBuilder tlsKeyStorePath(String tlsKeyStorePath);
222
223
/** Set key store password */
224
ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword);
225
226
/** Set trust store type */
227
ClientBuilder tlsTrustStoreType(String tlsTrustStoreType);
228
229
/** Set trust store path */
230
ClientBuilder tlsTrustStorePath(String tlsTrustStorePath);
231
232
/** Set trust store password */
233
ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword);
234
235
/** Set allowed cipher suites */
236
ClientBuilder tlsCiphers(Set<String> tlsCiphers);
237
238
/** Set allowed TLS protocols */
239
ClientBuilder tlsProtocols(Set<String> tlsProtocols);
240
}
241
```
242
243
### Proxy and Network Configuration
244
245
Network and proxy configuration methods.
246
247
```java { .api }
248
interface ClientBuilder {
249
/** Set proxy service URL and protocol */
250
ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
251
252
/** Set DNS lookup bind address and port */
253
ClientBuilder dnsLookupBind(String address, int port);
254
255
/** Set DNS server addresses */
256
ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses);
257
258
/** Set SOCKS5 proxy address */
259
ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress);
260
261
/** Set SOCKS5 proxy username */
262
ClientBuilder socks5ProxyUsername(String socks5ProxyUsername);
263
264
/** Set SOCKS5 proxy password */
265
ClientBuilder socks5ProxyPassword(String socks5ProxyPassword);
266
267
/** Set properties used for topic lookup */
268
ClientBuilder lookupProperties(Map<String, String> properties);
269
}
270
```
271
272
**Advanced Configuration Examples:**
273
274
```java
275
// TLS configuration
276
PulsarClient client = PulsarClient.builder()
277
.serviceUrl("pulsar+ssl://broker:6651")
278
.tlsCertificateFilePath("/path/to/client.cert.pem")
279
.tlsKeyFilePath("/path/to/client.key.pem")
280
.tlsTrustCertsFilePath("/path/to/ca.cert.pem")
281
.enableTlsHostnameVerification(true)
282
.build();
283
284
// Connection pooling and timeouts
285
PulsarClient client = PulsarClient.builder()
286
.serviceUrl("pulsar://broker:6650")
287
.operationTimeout(60, TimeUnit.SECONDS)
288
.connectionTimeout(10, TimeUnit.SECONDS)
289
.connectionsPerBroker(3)
290
.keepAliveInterval(30, TimeUnit.SECONDS)
291
.build();
292
293
// Proxy configuration
294
PulsarClient client = PulsarClient.builder()
295
.serviceUrl("pulsar://broker:6650")
296
.proxyServiceUrl("http://proxy:8080", ProxyProtocol.SNI)
297
.build();
298
```
299
300
## Supporting Types
301
302
```java { .api }
303
interface ServiceUrlProvider {
304
void initialize(PulsarClient client);
305
String getServiceUrl();
306
}
307
308
enum ProxyProtocol {
309
SNI
310
}
311
312
enum SizeUnit {
313
BYTES,
314
KILOBYTES,
315
MEGABYTES,
316
GIGABYTES,
317
TERABYTES
318
}
319
```