or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

client-management.mddocs/

0

# Client Management

1

2

Core client creation, configuration, and lifecycle management for establishing and maintaining connections to Pulsar brokers.

3

4

## Capabilities

5

6

### PulsarClient Factory

7

8

Main entry point for creating PulsarClient instances using the builder pattern.

9

10

```java { .api }

11

/**

12

* Main entry point for all Pulsar operations

13

* Thread-safe and can be reused for managing multiple producers, consumers, and readers

14

*/

15

interface PulsarClient extends Closeable {

16

/** Get a new builder instance for configuring and building a PulsarClient */

17

static ClientBuilder builder();

18

19

/** Create producer builder with default byte array schema */

20

ProducerBuilder<byte[]> newProducer();

21

22

/** Create producer builder with specified schema */

23

<T> ProducerBuilder<T> newProducer(Schema<T> schema);

24

25

/** Create consumer builder with default byte array schema */

26

ConsumerBuilder<byte[]> newConsumer();

27

28

/** Create consumer builder with specified schema */

29

<T> ConsumerBuilder<T> newConsumer(Schema<T> schema);

30

31

/** Create reader builder with default byte array schema */

32

ReaderBuilder<byte[]> newReader();

33

34

/** Create reader builder with specified schema */

35

<T> ReaderBuilder<T> newReader(Schema<T> schema);

36

37

/** Create table view builder with default byte array schema */

38

TableViewBuilder<byte[]> newTableView();

39

40

/** Create table view builder with specified schema */

41

<T> TableViewBuilder<T> newTableView(Schema<T> schema);

42

43

/** Create transaction builder */

44

TransactionBuilder newTransaction() throws PulsarClientException;

45

46

/** Get partition names for a topic */

47

CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled);

48

49

/** Update the service URL this client is using */

50

void updateServiceUrl(String serviceUrl) throws PulsarClientException;

51

52

/** Close the client gracefully */

53

void close() throws PulsarClientException;

54

55

/** Close the client asynchronously */

56

CompletableFuture<Void> closeAsync();

57

58

/** Force shutdown the client immediately */

59

void shutdown() throws PulsarClientException;

60

61

/** Check if the client has been closed */

62

boolean isClosed();

63

}

64

```

65

66

**Usage Examples:**

67

68

```java

69

import org.apache.pulsar.client.api.*;

70

71

// Basic client creation

72

PulsarClient client = PulsarClient.builder()

73

.serviceUrl("pulsar://localhost:6650")

74

.build();

75

76

// Advanced client configuration

77

PulsarClient client = PulsarClient.builder()

78

.serviceUrl("pulsar+ssl://my-broker:6651")

79

.authentication(AuthenticationFactory.token("my-token"))

80

.operationTimeout(30, TimeUnit.SECONDS)

81

.ioThreads(4)

82

.connectionsPerBroker(1)

83

.build();

84

85

// Get topic partitions

86

List<String> partitions = client.getPartitionsForTopic("my-topic", true).get();

87

88

// Graceful shutdown

89

client.close();

90

```

91

92

### ClientBuilder Configuration

93

94

Builder interface for configuring PulsarClient instances with extensive customization options.

95

96

```java { .api }

97

/**

98

* Builder interface for configuring and constructing PulsarClient instances

99

*/

100

interface ClientBuilder extends Serializable, Cloneable {

101

/** Construct the final PulsarClient instance */

102

PulsarClient build() throws PulsarClientException;

103

104

/** Load configuration from a map */

105

ClientBuilder loadConf(Map<String, Object> config);

106

107

/** Create a copy of the current client builder */

108

ClientBuilder clone();

109

110

/** Configure the service URL for the Pulsar service (required) */

111

ClientBuilder serviceUrl(String serviceUrl);

112

113

/** Configure the service URL provider for dynamic URLs */

114

ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider);

115

116

/** Configure listener name for advertised listener */

117

ClientBuilder listenerName(String name);

118

119

/** Set authentication provider */

120

ClientBuilder authentication(Authentication authentication);

121

122

/** Set authentication using plugin class name and parameters */

123

ClientBuilder authentication(String authPluginClassName, String authParamsString) throws UnsupportedAuthenticationException;

124

125

/** Set authentication using plugin class name and parameter map */

126

ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;

127

128

/** Set operation timeout (default: 30 seconds) */

129

ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit);

130

131

/** Set lookup timeout (default: matches operation timeout) */

132

ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit);

133

134

/** Set number of IO threads (default: available processors) */

135

ClientBuilder ioThreads(int numIoThreads);

136

137

/** Set number of listener threads (default: available processors) */

138

ClientBuilder listenerThreads(int numListenerThreads);

139

140

/** Set max connections per broker (default: 1) */

141

ClientBuilder connectionsPerBroker(int connectionsPerBroker);

142

143

/** Configure TCP no-delay flag (default: true) */

144

ClientBuilder enableTcpNoDelay(boolean enableTcpNoDelay);

145

146

/** Set connection max idle time in seconds (default: 25) */

147

ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds);

148

149

/** Set keep alive interval (default: 30 seconds) */

150

ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit);

151

152

/** Set connection timeout */

153

ClientBuilder connectionTimeout(int duration, TimeUnit unit);

154

155

/** Set starting backoff interval */

156

ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);

157

158

/** Set maximum backoff interval */

159

ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);

160

161

/** Set memory limit (default: 64 MB) */

162

ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

163

164

/** Set max concurrent lookup requests (default: 5000) */

165

ClientBuilder maxConcurrentLookupRequests(int maxConcurrentLookupRequests);

166

167

/** Set max lookup requests (default: 50000) */

168

ClientBuilder maxLookupRequests(int maxLookupRequests);

169

170

/** Set max lookup redirects */

171

ClientBuilder maxLookupRedirects(int maxLookupRedirects);

172

173

/** Set max rejected requests per connection (default: 50) */

174

ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection);

175

176

/** Enable busy-wait settings for low latency (default: false) */

177

ClientBuilder enableBusyWait(boolean enableBusyWait);

178

179

/** Configure OpenTelemetry for metrics */

180

ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);

181

182

/** Set clock for timestamps */

183

ClientBuilder clock(Clock clock);

184

185

/** Enable transaction support */

186

ClientBuilder enableTransaction(boolean enableTransaction);

187

}

188

```

189

190

### TLS and Security Configuration

191

192

TLS configuration methods for secure connections.

193

194

```java { .api }

195

interface ClientBuilder {

196

/** Set path to TLS key file */

197

ClientBuilder tlsKeyFilePath(String tlsKeyFilePath);

198

199

/** Set path to TLS certificate file */

200

ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath);

201

202

/** Set path to trusted TLS certificate file */

203

ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath);

204

205

/** Allow untrusted TLS connections (default: false) */

206

ClientBuilder allowTlsInsecureConnection(boolean allowTlsInsecureConnection);

207

208

/** Enable TLS hostname verification */

209

ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification);

210

211

/** Use KeyStore type for TLS configuration */

212

ClientBuilder useKeyStoreTls(boolean useKeyStoreTls);

213

214

/** Set security provider for SSL connections */

215

ClientBuilder sslProvider(String sslProvider);

216

217

/** Set key store type */

218

ClientBuilder tlsKeyStoreType(String tlsKeyStoreType);

219

220

/** Set key store path */

221

ClientBuilder tlsKeyStorePath(String tlsKeyStorePath);

222

223

/** Set key store password */

224

ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword);

225

226

/** Set trust store type */

227

ClientBuilder tlsTrustStoreType(String tlsTrustStoreType);

228

229

/** Set trust store path */

230

ClientBuilder tlsTrustStorePath(String tlsTrustStorePath);

231

232

/** Set trust store password */

233

ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword);

234

235

/** Set allowed cipher suites */

236

ClientBuilder tlsCiphers(Set<String> tlsCiphers);

237

238

/** Set allowed TLS protocols */

239

ClientBuilder tlsProtocols(Set<String> tlsProtocols);

240

}

241

```

242

243

### Proxy and Network Configuration

244

245

Network and proxy configuration methods.

246

247

```java { .api }

248

interface ClientBuilder {

249

/** Set proxy service URL and protocol */

250

ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);

251

252

/** Set DNS lookup bind address and port */

253

ClientBuilder dnsLookupBind(String address, int port);

254

255

/** Set DNS server addresses */

256

ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses);

257

258

/** Set SOCKS5 proxy address */

259

ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress);

260

261

/** Set SOCKS5 proxy username */

262

ClientBuilder socks5ProxyUsername(String socks5ProxyUsername);

263

264

/** Set SOCKS5 proxy password */

265

ClientBuilder socks5ProxyPassword(String socks5ProxyPassword);

266

267

/** Set properties used for topic lookup */

268

ClientBuilder lookupProperties(Map<String, String> properties);

269

}

270

```

271

272

**Advanced Configuration Examples:**

273

274

```java

275

// TLS configuration

276

PulsarClient client = PulsarClient.builder()

277

.serviceUrl("pulsar+ssl://broker:6651")

278

.tlsCertificateFilePath("/path/to/client.cert.pem")

279

.tlsKeyFilePath("/path/to/client.key.pem")

280

.tlsTrustCertsFilePath("/path/to/ca.cert.pem")

281

.enableTlsHostnameVerification(true)

282

.build();

283

284

// Connection pooling and timeouts

285

PulsarClient client = PulsarClient.builder()

286

.serviceUrl("pulsar://broker:6650")

287

.operationTimeout(60, TimeUnit.SECONDS)

288

.connectionTimeout(10, TimeUnit.SECONDS)

289

.connectionsPerBroker(3)

290

.keepAliveInterval(30, TimeUnit.SECONDS)

291

.build();

292

293

// Proxy configuration

294

PulsarClient client = PulsarClient.builder()

295

.serviceUrl("pulsar://broker:6650")

296

.proxyServiceUrl("http://proxy:8080", ProxyProtocol.SNI)

297

.build();

298

```

299

300

## Supporting Types

301

302

```java { .api }

303

interface ServiceUrlProvider {

304

void initialize(PulsarClient client);

305

String getServiceUrl();

306

}

307

308

enum ProxyProtocol {

309

SNI

310

}

311

312

enum SizeUnit {

313

BYTES,

314

KILOBYTES,

315

MEGABYTES,

316

GIGABYTES,

317

TERABYTES

318

}

319

```