or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdclient-management.mdindex.mdmessage-consumption.mdmessage-production.mdmessage-reading.mdschema-serialization.mdtransaction-support.md

authentication-security.mddocs/

0

# Authentication and Security

1

2

Authentication mechanisms, TLS configuration, message encryption, and comprehensive security features for secure messaging in Pulsar clusters.

3

4

## Capabilities

5

6

### Authentication Interface

7

8

Base interface for authentication mechanisms with support for various authentication methods.

9

10

```java { .api }

11

/**

12

* Base interface for authentication mechanisms

13

* Supports TLS, token-based, OAuth, and custom authentication methods

14

*/

15

interface Authentication extends Serializable, Closeable {

16

/** Get authentication method name */

17

String getAuthMethodName();

18

19

/** Get authentication data provider */

20

AuthenticationDataProvider getAuthData() throws PulsarClientException;

21

22

/** Configure authentication with encoded parameter string */

23

void configure(String encodedAuthParamString);

24

25

/** Configure authentication with parameter map */

26

void configure(Map<String, String> authParams);

27

28

/** Start authentication process */

29

void start() throws PulsarClientException;

30

31

/** Close authentication resources */

32

void close() throws IOException;

33

}

34

```

35

36

### AuthenticationFactory

37

38

Factory class for creating authentication instances with built-in support for common authentication methods.

39

40

```java { .api }

41

/**

42

* Factory for creating authentication instances

43

* Provides convenient methods for common authentication types

44

*/

45

class AuthenticationFactory {

46

/** Create TLS authentication using certificate and key files */

47

static Authentication TLS(String certFilePath, String keyFilePath);

48

49

/** Create token-based authentication with static token */

50

static Authentication token(String token);

51

52

/** Create token-based authentication with token supplier */

53

static Authentication token(Supplier<String> tokenSupplier);

54

55

/** Create OAuth 2.0 authentication */

56

static Authentication oauth2(String issuerUrl, String privateKeyUrl, String audience);

57

58

/** Create OAuth 2.0 authentication with credentials URL */

59

static Authentication oauth2(String issuerUrl, String credentialsUrl, String audience, String scope);

60

61

/** Create SASL authentication */

62

static Authentication sasl(String saslJaasClientSectionName, String serverType);

63

64

/** Create custom authentication using plugin class name and parameters */

65

static Authentication create(String authPluginClassName, String encodedAuthParamString) throws UnsupportedAuthenticationException;

66

67

/** Create custom authentication using plugin class name and parameter map */

68

static Authentication create(String authPluginClassName, Map<String, String> authParams) throws UnsupportedAuthenticationException;

69

}

70

```

71

72

**Authentication Examples:**

73

74

```java

75

import org.apache.pulsar.client.api.*;

76

77

// TLS authentication

78

Authentication tlsAuth = AuthenticationFactory.TLS(

79

"/path/to/client.cert.pem",

80

"/path/to/client.key.pem"

81

);

82

83

PulsarClient client = PulsarClient.builder()

84

.serviceUrl("pulsar+ssl://broker:6651")

85

.authentication(tlsAuth)

86

.build();

87

88

// Token authentication

89

Authentication tokenAuth = AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9...");

90

91

PulsarClient client = PulsarClient.builder()

92

.serviceUrl("pulsar://broker:6650")

93

.authentication(tokenAuth)

94

.build();

95

96

// Token authentication with supplier (for token refresh)

97

Authentication tokenAuth = AuthenticationFactory.token(() -> {

98

return refreshAndGetNewToken();

99

});

100

101

// OAuth 2.0 authentication

102

Authentication oauthAuth = AuthenticationFactory.oauth2(

103

"https://issuer.example.com",

104

"/path/to/private.key",

105

"audience-value"

106

);

107

108

// Custom authentication

109

Map<String, String> authParams = new HashMap<>();

110

authParams.put("username", "myuser");

111

authParams.put("password", "mypassword");

112

113

Authentication customAuth = AuthenticationFactory.create(

114

"com.example.MyAuthPlugin",

115

authParams

116

);

117

```

118

119

### AuthenticationDataProvider

120

121

Interface for providing authentication data for different transport protocols.

122

123

```java { .api }

124

/**

125

* Provider for authentication data across different transport protocols

126

* Supports TLS, HTTP, and command-based authentication data

127

*/

128

interface AuthenticationDataProvider {

129

/** Check if TLS authentication data is available */

130

boolean hasDataForTls();

131

132

/** Get TLS certificate file path */

133

String getTlsCertificateFilePath();

134

135

/** Get TLS private key file path */

136

String getTlsPrivateKeyFilePath();

137

138

/** Check if HTTP authentication data is available */

139

boolean hasDataForHttp();

140

141

/** Get HTTP headers for authentication */

142

Set<Map.Entry<String, String>> getHttpHeaders() throws PulsarClientException;

143

144

/** Check if command authentication data is available */

145

boolean hasDataFromCommand();

146

147

/** Get command data for authentication */

148

String getCommandData();

149

150

/** Check if token authentication data is available */

151

boolean hasDataForToken();

152

153

/** Get authentication token */

154

String getToken() throws PulsarClientException;

155

}

156

```

157

158

### Message Encryption

159

160

Comprehensive message encryption support with key management and crypto operations.

161

162

```java { .api }

163

/**

164

* Interface for reading encryption keys

165

* Supports both public and private key retrieval with metadata

166

*/

167

interface CryptoKeyReader {

168

/** Get public key for encryption */

169

EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta);

170

171

/** Get private key for decryption */

172

EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta);

173

}

174

175

/**

176

* Container for encryption key information

177

*/

178

class EncryptionKeyInfo {

179

/** Create encryption key info */

180

EncryptionKeyInfo(byte[] key, Map<String, String> metadata);

181

182

/** Get key bytes */

183

byte[] getKey();

184

185

/** Get key metadata */

186

Map<String, String> getMetadata();

187

}

188

189

/**

190

* Interface for message encryption/decryption operations

191

*/

192

interface MessageCrypto {

193

/** Encrypt message payload */

194

boolean encrypt(Set<String> encKeys, CryptoKeyReader keyReader,

195

MessageMetadata.Builder msgMetadata, ByteBuf payload);

196

197

/** Decrypt message payload */

198

boolean decrypt(MessageMetadata msgMetadata, ByteBuf payload,

199

CryptoKeyReader keyReader);

200

201

/** Get decrypted data */

202

ByteBuf getDecryptedData();

203

204

/** Release resources */

205

void close();

206

}

207

```

208

209

**Encryption Examples:**

210

211

```java

212

// Custom crypto key reader implementation

213

class FileCryptoKeyReader implements CryptoKeyReader {

214

private String publicKeyPath;

215

private String privateKeyPath;

216

217

public FileCryptoKeyReader(String publicKeyPath, String privateKeyPath) {

218

this.publicKeyPath = publicKeyPath;

219

this.privateKeyPath = privateKeyPath;

220

}

221

222

@Override

223

public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {

224

byte[] keyBytes = readKeyFromFile(publicKeyPath + "/" + keyName + ".pub");

225

return new EncryptionKeyInfo(keyBytes, keyMeta);

226

}

227

228

@Override

229

public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {

230

byte[] keyBytes = readKeyFromFile(privateKeyPath + "/" + keyName + ".key");

231

return new EncryptionKeyInfo(keyBytes, keyMeta);

232

}

233

}

234

235

// Producer with encryption

236

Producer<String> producer = client.newProducer(Schema.STRING)

237

.topic("encrypted-topic")

238

.addEncryptionKey("my-app-key")

239

.cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))

240

.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)

241

.create();

242

243

// Consumer with decryption

244

Consumer<String> consumer = client.newConsumer(Schema.STRING)

245

.topic("encrypted-topic")

246

.subscriptionName("encrypted-sub")

247

.cryptoKeyReader(new FileCryptoKeyReader("/keys/public", "/keys/private"))

248

.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)

249

.subscribe();

250

251

// Default crypto key reader (simplified)

252

Producer<String> producer = client.newProducer(Schema.STRING)

253

.topic("encrypted-topic")

254

.addEncryptionKey("my-app-key")

255

.defaultCryptoKeyReader("/path/to/public/key")

256

.create();

257

```

258

259

### TLS Configuration

260

261

Comprehensive TLS configuration for secure broker connections.

262

263

```java { .api }

264

/**

265

* KeyStore parameters for TLS configuration

266

*/

267

class KeyStoreParams {

268

/** Create KeyStore parameters */

269

KeyStoreParams(String keyStorePath, String keyStorePassword, String keyStoreType);

270

271

/** Get key store path */

272

String getKeyStorePath();

273

274

/** Get key store password */

275

String getKeyStorePassword();

276

277

/** Get key store type */

278

String getKeyStoreType();

279

}

280

```

281

282

**TLS Configuration Examples:**

283

284

```java

285

// File-based TLS configuration

286

PulsarClient client = PulsarClient.builder()

287

.serviceUrl("pulsar+ssl://broker:6651")

288

.tlsCertificateFilePath("/path/to/client.cert.pem")

289

.tlsKeyFilePath("/path/to/client.key.pem")

290

.tlsTrustCertsFilePath("/path/to/ca.cert.pem")

291

.enableTlsHostnameVerification(true)

292

.build();

293

294

// KeyStore-based TLS configuration

295

PulsarClient client = PulsarClient.builder()

296

.serviceUrl("pulsar+ssl://broker:6651")

297

.useKeyStoreTls(true)

298

.tlsKeyStoreType("JKS")

299

.tlsKeyStorePath("/path/to/client.keystore")

300

.tlsKeyStorePassword("keystorePassword")

301

.tlsTrustStoreType("JKS")

302

.tlsTrustStorePath("/path/to/truststore.jks")

303

.tlsTrustStorePassword("truststorePassword")

304

.build();

305

306

// TLS with custom cipher suites and protocols

307

PulsarClient client = PulsarClient.builder()

308

.serviceUrl("pulsar+ssl://broker:6651")

309

.tlsCiphers(Set.of("TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"))

310

.tlsProtocols(Set.of("TLSv1.3", "TLSv1.2"))

311

.sslProvider("Conscrypt")

312

.build();

313

314

// TLS with insecure connection (for testing)

315

PulsarClient client = PulsarClient.builder()

316

.serviceUrl("pulsar+ssl://broker:6651")

317

.allowTlsInsecureConnection(true)

318

.enableTlsHostnameVerification(false)

319

.build();

320

```

321

322

### Authorization and Access Control

323

324

Support for fine-grained access control and authorization policies.

325

326

```java { .api }

327

/**

328

* Authorization data provider for role-based access control

329

*/

330

interface AuthorizationDataProvider {

331

/** Get subject (user/role) for authorization */

332

String getSubject();

333

334

/** Get additional authorization properties */

335

Map<String, String> getAuthorizationProperties();

336

}

337

338

/**

339

* Interface for encoded authentication parameter support

340

*/

341

interface EncodedAuthenticationParameterSupport {

342

/** Check if encoded parameters are supported */

343

default boolean supportsEncodedAuthParamString() {

344

return false;

345

}

346

}

347

```

348

349

### JWT Token Support

350

351

Specialized support for JSON Web Token (JWT) authentication.

352

353

```java { .api }

354

/**

355

* JWT token authentication support

356

* Handles token validation, refresh, and expiration

357

*/

358

class JWTAuthenticationProvider implements Authentication {

359

/** Create JWT authentication with static token */

360

static Authentication createWithToken(String token);

361

362

/** Create JWT authentication with token supplier */

363

static Authentication createWithTokenSupplier(Supplier<String> tokenSupplier);

364

365

/** Create JWT authentication with token file path */

366

static Authentication createWithTokenFile(String tokenFilePath);

367

}

368

```

369

370

**JWT Examples:**

371

372

```java

373

// JWT with static token

374

Authentication jwtAuth = AuthenticationFactory.token(

375

"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."

376

);

377

378

// JWT with token file (automatically reloaded)

379

Authentication jwtAuth = AuthenticationFactory.token(() -> {

380

try {

381

return Files.readString(Paths.get("/path/to/token.jwt"));

382

} catch (IOException e) {

383

throw new RuntimeException("Failed to read token", e);

384

}

385

});

386

387

// JWT with automatic refresh

388

Authentication jwtAuth = AuthenticationFactory.token(() -> {

389

// Implement token refresh logic

390

return getRefreshedToken();

391

});

392

393

PulsarClient client = PulsarClient.builder()

394

.serviceUrl("pulsar://broker:6650")

395

.authentication(jwtAuth)

396

.build();

397

```

398

399

### Security Exception Handling

400

401

Comprehensive exception hierarchy for security-related operations.

402

403

```java { .api }

404

/**

405

* Authentication-related exceptions

406

*/

407

class PulsarClientException {

408

/** Authentication failed */

409

static class AuthenticationException extends PulsarClientException {

410

AuthenticationException(String msg);

411

AuthenticationException(Throwable t);

412

}

413

414

/** Authorization failed */

415

static class AuthorizationException extends PulsarClientException {

416

AuthorizationException(String msg);

417

}

418

419

/** Unsupported authentication method */

420

static class UnsupportedAuthenticationException extends PulsarClientException {

421

UnsupportedAuthenticationException(String msg, Throwable t);

422

}

423

424

/** Error getting authentication data */

425

static class GettingAuthenticationDataException extends PulsarClientException {

426

GettingAuthenticationDataException(String msg);

427

GettingAuthenticationDataException(Throwable t);

428

}

429

430

/** Encryption/decryption errors */

431

static class CryptoException extends PulsarClientException {

432

CryptoException(String msg);

433

}

434

}

435

```

436

437

### Security Best Practices Configuration

438

439

Configuration helpers for implementing security best practices.

440

441

```java { .api }

442

/**

443

* Security configuration utilities

444

*/

445

class SecurityUtils {

446

/** Validate TLS configuration */

447

static boolean validateTlsConfiguration(ClientBuilder builder);

448

449

/** Check if connection is secure */

450

static boolean isSecureConnection(String serviceUrl);

451

452

/** Validate authentication configuration */

453

static boolean validateAuthentication(Authentication auth);

454

}

455

```

456

457

**Security Best Practices Examples:**

458

459

```java

460

// Production-ready secure client configuration

461

PulsarClient client = PulsarClient.builder()

462

.serviceUrl("pulsar+ssl://broker:6651")

463

.authentication(AuthenticationFactory.token(() -> getTokenFromSecureStore()))

464

.tlsCertificateFilePath("/secure/path/client.cert.pem")

465

.tlsKeyFilePath("/secure/path/client.key.pem")

466

.tlsTrustCertsFilePath("/secure/path/ca-bundle.cert.pem")

467

.enableTlsHostnameVerification(true)

468

.tlsProtocols(Set.of("TLSv1.3"))

469

.build();

470

471

// Encrypted producer with secure key management

472

Producer<String> producer = client.newProducer(Schema.STRING)

473

.topic("sensitive-data")

474

.addEncryptionKey("data-encryption-key")

475

.cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())

476

.cryptoFailureAction(ProducerCryptoFailureAction.FAIL)

477

.create();

478

479

// Consumer with decryption and access logging

480

Consumer<String> consumer = client.newConsumer(Schema.STRING)

481

.topic("sensitive-data")

482

.subscriptionName("secure-processor")

483

.cryptoKeyReader(new SecureKeyVaultCryptoKeyReader())

484

.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)

485

.intercept(new SecurityAuditInterceptor())

486

.subscribe();

487

```

488

489

## Supporting Types and Enums

490

491

```java { .api }

492

enum ProducerCryptoFailureAction {

493

/** Fail the send operation */

494

FAIL,

495

/** Send message unencrypted */

496

SEND

497

}

498

499

enum ConsumerCryptoFailureAction {

500

/** Fail the receive operation */

501

FAIL,

502

/** Discard the message */

503

DISCARD,

504

/** Consume message as-is */

505

CONSUME

506

}

507

508

interface DummyCryptoKeyReaderImpl extends CryptoKeyReader {

509

/** Dummy implementation that returns null keys */

510

static CryptoKeyReader INSTANCE = new DummyCryptoKeyReaderImpl();

511

}

512

513

class KeyStoreParams {

514

String keyStorePath;

515

String keyStorePassword;

516

String keyStoreType;

517

}

518

```