or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-factory.mdcore-operations.mdevents-state.mdindex.mdschema-validation.mdtransactions.mdwatchers.md

events-state.mddocs/

0

# Event and State Management

1

2

Comprehensive event handling and connection state management with listeners for background operations, connection state changes, and unhandled errors, providing robust monitoring and error handling capabilities.

3

4

## Capabilities

5

6

### Event Handling

7

8

Core event interfaces for handling ZooKeeper events and background operation results.

9

10

```java { .api }

11

/**

12

* Returns the listenable interface for events

13

* @return listenable

14

*/

15

Listenable<CuratorListener> getCuratorListenable();

16

17

/**

18

* Returns the listenable interface for unhandled errors

19

* @return listenable

20

*/

21

Listenable<UnhandledErrorListener> getUnhandledErrorListenable();

22

23

/**

24

* Represents ZooKeeper events and background operation results

25

*/

26

public interface CuratorEvent {

27

/**

28

* Get the event type

29

* @return event type

30

*/

31

CuratorEventType getType();

32

33

/**

34

* Get the ZooKeeper result code

35

* @return result code (0 = success)

36

*/

37

int getResultCode();

38

39

/**

40

* Get the path associated with this event

41

* @return path or null

42

*/

43

String getPath();

44

45

/**

46

* Get the background context object

47

* @return context or null

48

*/

49

Object getContext();

50

51

/**

52

* Get stat information

53

* @return stat or null

54

*/

55

Stat getStat();

56

57

/**

58

* Get data associated with the event

59

* @return data bytes or null

60

*/

61

byte[] getData();

62

63

/**

64

* Get the name (for create operations with sequential modes)

65

* @return name or null

66

*/

67

String getName();

68

69

/**

70

* Get children list (for getChildren operations)

71

* @return children list or null

72

*/

73

List<String> getChildren();

74

75

/**

76

* Get ACL list (for getACL/setACL operations)

77

* @return ACL list or null

78

*/

79

List<ACL> getACLList();

80

81

/**

82

* Get transaction results (for transaction operations)

83

* @return transaction results or null

84

*/

85

List<CuratorTransactionResult> getOpResults();

86

87

/**

88

* Get the watched event (for watcher callbacks)

89

* @return watched event or null

90

*/

91

WatchedEvent getWatchedEvent();

92

}

93

94

/**

95

* Types of curator events

96

*/

97

public enum CuratorEventType {

98

CREATE, DELETE, EXISTS, GET_DATA, SET_DATA, CHILDREN,

99

SYNC, GET_ACL, SET_ACL, TRANSACTION, GET_CONFIG, RECONFIG,

100

WATCHED, REMOVE_WATCHES, ADD_WATCH, CLOSING

101

}

102

103

/**

104

* Listener for background events and errors

105

*/

106

public interface CuratorListener {

107

/**

108

* Called when a background event occurs

109

* @param client the client

110

* @param event the event

111

* @throws Exception errors

112

*/

113

void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;

114

}

115

116

/**

117

* Listener for unhandled errors

118

*/

119

public interface UnhandledErrorListener {

120

/**

121

* Called when an unhandled error occurs

122

* @param message error message

123

* @param e the exception

124

*/

125

void unhandledError(String message, Throwable e);

126

}

127

128

/**

129

* Callback for background operations

130

*/

131

public interface BackgroundCallback {

132

/**

133

* Called when a background operation completes

134

* @param client the client

135

* @param event the event

136

* @throws Exception errors

137

*/

138

void processResult(CuratorFramework client, CuratorEvent event) throws Exception;

139

}

140

```

141

142

**Usage Examples:**

143

144

```java

145

// Add general curator listener

146

client.getCuratorListenable().addListener(new CuratorListener() {

147

@Override

148

public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {

149

System.out.println("Event: " + event.getType() + " Path: " + event.getPath());

150

151

// Handle different event types

152

switch (event.getType()) {

153

case CREATE:

154

System.out.println("Node created: " + event.getPath());

155

break;

156

case DELETE:

157

System.out.println("Node deleted: " + event.getPath());

158

break;

159

case GET_DATA:

160

byte[] data = event.getData();

161

System.out.println("Got data: " + new String(data));

162

break;

163

case CHILDREN:

164

List<String> children = event.getChildren();

165

System.out.println("Children: " + children);

166

break;

167

case TRANSACTION:

168

List<CuratorTransactionResult> results = event.getOpResults();

169

System.out.println("Transaction completed with " + results.size() + " operations");

170

break;

171

}

172

}

173

});

174

175

// Add unhandled error listener

176

client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {

177

@Override

178

public void unhandledError(String message, Throwable e) {

179

System.err.println("Unhandled error: " + message);

180

e.printStackTrace();

181

}

182

});

183

184

// Use background callback for specific operations

185

client.create()

186

.creatingParentsIfNeeded()

187

.inBackground(new BackgroundCallback() {

188

@Override

189

public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {

190

if (event.getResultCode() == 0) {

191

System.out.println("Successfully created: " + event.getPath());

192

} else {

193

System.err.println("Failed to create: " + event.getResultCode());

194

}

195

}

196

})

197

.forPath("/background/path", "data".getBytes());

198

199

// Lambda-style callback

200

client.getData()

201

.inBackground((curatorFramework, curatorEvent) -> {

202

if (curatorEvent.getResultCode() == 0) {

203

byte[] data = curatorEvent.getData();

204

System.out.println("Data retrieved: " + new String(data));

205

}

206

})

207

.forPath("/some/path");

208

```

209

210

### Connection State Management

211

212

Connection state tracking and error handling for robust distributed application development.

213

214

```java { .api }

215

/**

216

* Returns the listenable interface for the Connect State

217

* @return listenable

218

*/

219

Listenable<ConnectionStateListener> getConnectionStateListenable();

220

221

/**

222

* Return the configured error policy

223

* @return error policy

224

*/

225

ConnectionStateErrorPolicy getConnectionStateErrorPolicy();

226

227

/**

228

* Represents connection states to ZooKeeper

229

*/

230

public enum ConnectionState {

231

/**

232

* Sent for the first successful connection to the server

233

*/

234

CONNECTED,

235

236

/**

237

* There has been a loss of connection. Leaders, locks, etc. should suspend

238

* until the connection is re-established

239

*/

240

SUSPENDED,

241

242

/**

243

* A suspended or lost connection has been re-established

244

*/

245

RECONNECTED,

246

247

/**

248

* The connection is confirmed to be lost. Close any locks, leaders, etc. and

249

* attempt to re-create them

250

*/

251

LOST,

252

253

/**

254

* The connection has gone into read-only mode. This can only happen if you pass true

255

* for canBeReadOnly() in the builder

256

*/

257

READ_ONLY;

258

259

/**

260

* Check if this state represents a connection

261

* @return true if connected

262

*/

263

public boolean isConnected() {

264

return (this == CONNECTED) || (this == RECONNECTED) || (this == READ_ONLY);

265

}

266

}

267

268

/**

269

* Listener for connection state changes

270

*/

271

public interface ConnectionStateListener {

272

/**

273

* Called when the connection state changes

274

* @param client the client

275

* @param newState the new state

276

*/

277

void stateChanged(CuratorFramework client, ConnectionState newState);

278

}

279

280

/**

281

* Policy for handling connection errors

282

*/

283

public interface ConnectionStateErrorPolicy {

284

/**

285

* Return true if this the given state represents an error

286

* @param state the state

287

* @return true/false

288

*/

289

boolean isErrorState(ConnectionState state);

290

291

/**

292

* Return the timeout to use when checking error states

293

* @return timeout in milliseconds

294

*/

295

int getErrorThresholdMs();

296

}

297

298

/**

299

* Default error policy implementation

300

*/

301

public class StandardConnectionStateErrorPolicy implements ConnectionStateErrorPolicy {

302

// Default implementation considers SUSPENDED and LOST as error states

303

}

304

305

/**

306

* Session-based error policy implementation

307

*/

308

public class SessionConnectionStateErrorPolicy implements ConnectionStateErrorPolicy {

309

// Session-based error handling with different thresholds

310

}

311

```

312

313

**Usage Examples:**

314

315

```java

316

// Add connection state listener

317

client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

318

@Override

319

public void stateChanged(CuratorFramework client, ConnectionState newState) {

320

System.out.println("Connection state changed to: " + newState);

321

322

switch (newState) {

323

case CONNECTED:

324

System.out.println("Connected to ZooKeeper");

325

// Initialize application state

326

break;

327

328

case SUSPENDED:

329

System.out.println("Connection suspended - pausing operations");

330

// Pause non-critical operations

331

break;

332

333

case RECONNECTED:

334

System.out.println("Reconnected to ZooKeeper");

335

// Resume operations, refresh state if needed

336

break;

337

338

case LOST:

339

System.out.println("Connection lost - cleaning up");

340

// Clean up ephemeral nodes, release locks, etc.

341

break;

342

343

case READ_ONLY:

344

System.out.println("In read-only mode");

345

// Only perform read operations

346

break;

347

}

348

}

349

});

350

351

// Use custom executor for connection state callbacks

352

Executor customExecutor = Executors.newSingleThreadExecutor(r -> {

353

Thread t = new Thread(r, "ConnectionStateHandler");

354

t.setDaemon(true);

355

return t;

356

});

357

358

client.getConnectionStateListenable().addListener(connectionStateListener, customExecutor);

359

360

// Check current connection state

361

ConnectionState currentState = client.getZookeeperClient().getZooKeeper().getState();

362

System.out.println("Current ZK state: " + currentState);

363

364

// Wait for connection

365

if (!client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {

366

throw new RuntimeException("Failed to connect to ZooKeeper");

367

}

368

```

369

370

### Advanced Connection State Management

371

372

Advanced connection state management with circuit breaking and custom listener factories.

373

374

```java { .api }

375

/**

376

* Factory for creating connection state listener managers

377

*/

378

public class ConnectionStateListenerManagerFactory {

379

/**

380

* Standard factory instance

381

*/

382

public static final ConnectionStateListenerManagerFactory standard;

383

384

/**

385

* Create a circuit breaking factory

386

* @param retryPolicy retry policy for circuit breaking

387

* @return circuit breaking factory

388

*/

389

public static ConnectionStateListenerManagerFactory circuitBreaking(RetryPolicy retryPolicy);

390

}

391

392

/**

393

* Circuit breaker for connection management

394

*/

395

public interface CircuitBreaker {

396

/**

397

* Check if the circuit is open (failing)

398

* @return true if circuit is open

399

*/

400

boolean isOpen();

401

402

/**

403

* Get the current retry count

404

* @return retry count

405

*/

406

int getRetryCount();

407

}

408

409

/**

410

* Connection state listener with circuit breaking capability

411

*/

412

public class CircuitBreakingConnectionStateListener implements ConnectionStateListener {

413

/**

414

* Get the circuit breaker instance

415

* @return circuit breaker

416

*/

417

public CircuitBreaker getCircuitBreaker();

418

}

419

420

/**

421

* Manager for circuit breaking functionality

422

*/

423

public class CircuitBreakingManager {

424

/**

425

* Check if circuit breaking is active

426

* @return true if active

427

*/

428

public boolean isActive();

429

430

/**

431

* Get the current circuit breaker

432

* @return circuit breaker or null

433

*/

434

public CircuitBreaker getCircuitBreaker();

435

}

436

```

437

438

**Usage Examples:**

439

440

```java

441

// Configure client with circuit breaking connection state management

442

CuratorFramework clientWithCircuitBreaker = CuratorFrameworkFactory.builder()

443

.connectString("localhost:2181")

444

.retryPolicy(new ExponentialBackoffRetry(1000, 3))

445

.connectionStateListenerManagerFactory(

446

ConnectionStateListenerManagerFactory.circuitBreaking(

447

new ExponentialBackoffRetry(1000, 5)

448

)

449

)

450

.build();

451

452

// Custom error policy

453

ConnectionStateErrorPolicy customPolicy = new ConnectionStateErrorPolicy() {

454

@Override

455

public boolean isErrorState(ConnectionState state) {

456

return state == ConnectionState.LOST || state == ConnectionState.SUSPENDED;

457

}

458

459

@Override

460

public int getErrorThresholdMs() {

461

return 30000; // 30 seconds

462

}

463

};

464

465

CuratorFramework clientWithCustomPolicy = CuratorFrameworkFactory.builder()

466

.connectString("localhost:2181")

467

.retryPolicy(new ExponentialBackoffRetry(1000, 3))

468

.connectionStateErrorPolicy(customPolicy)

469

.build();

470

```

471

472

### Listener Management

473

474

Generic interface for managing event listeners with custom executors.

475

476

```java { .api }

477

/**

478

* Generic interface for managing listeners

479

*/

480

public interface Listenable<T> {

481

/**

482

* Add a listener with the default executor

483

* @param listener listener to add

484

*/

485

void addListener(T listener);

486

487

/**

488

* Add a listener with custom executor

489

* @param listener listener to add

490

* @param executor executor for listener callbacks

491

*/

492

void addListener(T listener, Executor executor);

493

494

/**

495

* Remove a listener

496

* @param listener listener to remove

497

*/

498

void removeListener(T listener);

499

}

500

501

/**

502

* Container for listener and its executor

503

*/

504

public class ListenerEntry<T> {

505

/**

506

* Get the listener

507

* @return listener

508

*/

509

public T getListener();

510

511

/**

512

* Get the executor

513

* @return executor

514

*/

515

public Executor getExecutor();

516

}

517

```

518

519

**Usage Examples:**

520

521

```java

522

// Add listeners with custom executors

523

ExecutorService backgroundExecutor = Executors.newCachedThreadPool();

524

ExecutorService priorityExecutor = Executors.newSingleThreadExecutor();

525

526

// High priority connection state listener

527

client.getConnectionStateListenable().addListener(criticalConnectionListener, priorityExecutor);

528

529

// Background event processing

530

client.getCuratorListenable().addListener(backgroundEventListener, backgroundExecutor);

531

532

// Remove listeners when done

533

client.getConnectionStateListenable().removeListener(criticalConnectionListener);

534

client.getCuratorListenable().removeListener(backgroundEventListener);

535

536

// Clean up executors

537

backgroundExecutor.shutdown();

538

priorityExecutor.shutdown();

539

```