or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-connection.mdconnection-handling.mdensemble-providers.mdindex.mdpath-utilities.mdretry-policies.mdtracing-metrics.md

connection-handling.mddocs/

0

# Connection Handling

1

2

Advanced connection handling policies and thread-local retry management for fine-grained control over ZooKeeper connection behavior in Apache Curator. These components provide sophisticated connection management beyond basic retry policies.

3

4

## Capabilities

5

6

### ConnectionHandlingPolicy Interface

7

8

Interface for implementing custom connection timeout and handling behavior policies.

9

10

```java { .api }

11

/**

12

* Interface for connection handling policies

13

*/

14

public interface ConnectionHandlingPolicy {

15

/**

16

* Check connection timeouts and return status information

17

* @return CheckTimeoutsResult with timeout status details

18

* @throws Exception if timeout check fails

19

*/

20

CheckTimeoutsResult checkTimeouts() throws Exception;

21

22

/**

23

* Execute callable with retry logic specific to this connection policy

24

* @param client CuratorZookeeperClient instance

25

* @param proc Callable to execute with connection handling

26

* @return Result of successful callable execution

27

* @throws Exception if operation fails after policy handling

28

*/

29

<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception;

30

}

31

```

32

33

### StandardConnectionHandlingPolicy Class

34

35

Default implementation of ConnectionHandlingPolicy providing standard connection management behavior.

36

37

```java { .api }

38

/**

39

* Standard implementation of connection handling policy

40

*/

41

public class StandardConnectionHandlingPolicy implements ConnectionHandlingPolicy {

42

/**

43

* Create standard connection handling policy with default settings

44

*/

45

public StandardConnectionHandlingPolicy();

46

47

/**

48

* Create standard connection handling policy with custom timeout

49

* @param timeoutMs Custom timeout in milliseconds

50

*/

51

public StandardConnectionHandlingPolicy(int timeoutMs);

52

}

53

```

54

55

**Usage Examples:**

56

57

```java

58

import org.apache.curator.connection.StandardConnectionHandlingPolicy;

59

import org.apache.curator.connection.ConnectionHandlingPolicy;

60

61

// Default standard policy

62

ConnectionHandlingPolicy defaultPolicy = new StandardConnectionHandlingPolicy();

63

64

// Custom timeout policy

65

ConnectionHandlingPolicy customPolicy = new StandardConnectionHandlingPolicy(10000); // 10 second timeout

66

67

// Use with operations

68

try {

69

String result = customPolicy.callWithRetry(client, () -> {

70

return new String(client.getZooKeeper().getData("/config/setting", false, null));

71

});

72

} catch (Exception e) {

73

// Handle connection policy failure

74

System.err.println("Connection policy failed: " + e.getMessage());

75

}

76

```

77

78

### ThreadLocalRetryLoop Class

79

80

Thread-local retry loop management for handling retry state per thread in multi-threaded applications.

81

82

```java { .api }

83

/**

84

* Thread-local retry loop management

85

*/

86

public class ThreadLocalRetryLoop {

87

/**

88

* Get or create retry loop for current thread

89

* @param client CuratorZookeeperClient instance

90

* @return RetryLoop instance for current thread

91

*/

92

public static RetryLoop getRetryLoop(CuratorZookeeperClient client);

93

94

/**

95

* Clear retry loop for current thread

96

* Use this to clean up thread-local state when thread processing is complete

97

*/

98

public static void clearRetryLoop();

99

100

/**

101

* Check if current thread has an active retry loop

102

* @return true if current thread has retry loop, false otherwise

103

*/

104

public static boolean hasRetryLoop();

105

}

106

```

107

108

**Usage Examples:**

109

110

```java

111

import org.apache.curator.connection.ThreadLocalRetryLoop;

112

113

// Multi-threaded retry handling

114

ExecutorService executor = Executors.newFixedThreadPool(10);

115

116

executor.submit(() -> {

117

try {

118

// Get thread-local retry loop

119

RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);

120

121

while (retryLoop.shouldContinue()) {

122

try {

123

// ZooKeeper operations specific to this thread

124

client.getZooKeeper().create("/thread/" + Thread.currentThread().getId(),

125

"data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

126

retryLoop.markComplete();

127

} catch (Exception e) {

128

retryLoop.takeException(e);

129

}

130

}

131

} finally {

132

// Clean up thread-local state

133

ThreadLocalRetryLoop.clearRetryLoop();

134

}

135

});

136

```

137

138

### CheckTimeoutsResult Class

139

140

Result object returned by connection handling policy timeout checks.

141

142

```java { .api }

143

/**

144

* Result of connection timeout checks

145

*/

146

public class CheckTimeoutsResult {

147

/**

148

* Check if connection has timed out

149

* @return true if connection is timed out

150

*/

151

public boolean isTimedOut();

152

153

/**

154

* Get timeout duration in milliseconds

155

* @return Timeout duration, or -1 if not timed out

156

*/

157

public long getTimeoutMs();

158

159

/**

160

* Get descriptive message about timeout status

161

* @return Human-readable timeout status message

162

*/

163

public String getMessage();

164

}

165

```

166

167

## Advanced Connection Handling Patterns

168

169

### Custom Connection Policy Implementation

170

171

```java

172

import org.apache.curator.connection.ConnectionHandlingPolicy;

173

import java.util.concurrent.Callable;

174

175

/**

176

* Custom connection policy with adaptive timeout based on operation history

177

*/

178

public class AdaptiveConnectionHandlingPolicy implements ConnectionHandlingPolicy {

179

private final AtomicLong avgOperationTime = new AtomicLong(1000);

180

private final int baseTimeoutMs;

181

182

public AdaptiveConnectionHandlingPolicy(int baseTimeoutMs) {

183

this.baseTimeoutMs = baseTimeoutMs;

184

}

185

186

@Override

187

public CheckTimeoutsResult checkTimeouts() throws Exception {

188

// Calculate adaptive timeout based on recent operation performance

189

long adaptiveTimeout = Math.max(baseTimeoutMs, avgOperationTime.get() * 3);

190

191

// Custom timeout logic

192

boolean timedOut = /* your timeout detection logic */;

193

194

return new CheckTimeoutsResult() {

195

@Override

196

public boolean isTimedOut() { return timedOut; }

197

198

@Override

199

public long getTimeoutMs() { return adaptiveTimeout; }

200

201

@Override

202

public String getMessage() {

203

return "Adaptive timeout: " + adaptiveTimeout + "ms";

204

}

205

};

206

}

207

208

@Override

209

public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {

210

long startTime = System.currentTimeMillis();

211

212

try {

213

T result = proc.call();

214

215

// Update average operation time for adaptive behavior

216

long operationTime = System.currentTimeMillis() - startTime;

217

avgOperationTime.set((avgOperationTime.get() + operationTime) / 2);

218

219

return result;

220

} catch (Exception e) {

221

// Custom retry logic based on exception type and timeout status

222

CheckTimeoutsResult timeoutResult = checkTimeouts();

223

if (timeoutResult.isTimedOut()) {

224

throw new RuntimeException("Operation timed out: " + timeoutResult.getMessage(), e);

225

}

226

throw e;

227

}

228

}

229

}

230

```

231

232

### Circuit Breaker Connection Policy

233

234

```java

235

/**

236

* Connection policy with circuit breaker pattern for failing connections

237

*/

238

public class CircuitBreakerConnectionPolicy implements ConnectionHandlingPolicy {

239

private enum State { CLOSED, OPEN, HALF_OPEN }

240

241

private volatile State state = State.CLOSED;

242

private final AtomicInteger failureCount = new AtomicInteger(0);

243

private final AtomicLong lastFailureTime = new AtomicLong(0);

244

private final int failureThreshold;

245

private final long recoveryTimeoutMs;

246

247

public CircuitBreakerConnectionPolicy(int failureThreshold, long recoveryTimeoutMs) {

248

this.failureThreshold = failureThreshold;

249

this.recoveryTimeoutMs = recoveryTimeoutMs;

250

}

251

252

@Override

253

public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception {

254

if (state == State.OPEN) {

255

if (System.currentTimeMillis() - lastFailureTime.get() > recoveryTimeoutMs) {

256

state = State.HALF_OPEN;

257

} else {

258

throw new RuntimeException("Circuit breaker is OPEN - failing fast");

259

}

260

}

261

262

try {

263

T result = proc.call();

264

265

// Success - reset circuit breaker

266

if (state == State.HALF_OPEN) {

267

state = State.CLOSED;

268

failureCount.set(0);

269

}

270

271

return result;

272

} catch (Exception e) {

273

// Failure - update circuit breaker state

274

int failures = failureCount.incrementAndGet();

275

lastFailureTime.set(System.currentTimeMillis());

276

277

if (failures >= failureThreshold) {

278

state = State.OPEN;

279

}

280

281

throw e;

282

}

283

}

284

285

@Override

286

public CheckTimeoutsResult checkTimeouts() throws Exception {

287

return new CheckTimeoutsResult() {

288

@Override

289

public boolean isTimedOut() { return state == State.OPEN; }

290

291

@Override

292

public long getTimeoutMs() { return recoveryTimeoutMs; }

293

294

@Override

295

public String getMessage() {

296

return "Circuit breaker state: " + state + ", failures: " + failureCount.get();

297

}

298

};

299

}

300

}

301

```

302

303

### Multi-threaded Connection Management

304

305

```java

306

/**

307

* Connection manager for multi-threaded applications with thread-local retry handling

308

*/

309

public class MultiThreadedConnectionManager {

310

private final CuratorZookeeperClient client;

311

private final ConnectionHandlingPolicy policy;

312

private final ExecutorService executorService;

313

314

public MultiThreadedConnectionManager(CuratorZookeeperClient client,

315

ConnectionHandlingPolicy policy,

316

int threadPoolSize) {

317

this.client = client;

318

this.policy = policy;

319

this.executorService = Executors.newFixedThreadPool(threadPoolSize);

320

}

321

322

public <T> CompletableFuture<T> executeAsync(Callable<T> operation) {

323

return CompletableFuture.supplyAsync(() -> {

324

try {

325

// Use thread-local retry loop

326

if (!ThreadLocalRetryLoop.hasRetryLoop()) {

327

RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);

328

}

329

330

// Execute with connection policy

331

return policy.callWithRetry(client, operation);

332

333

} catch (Exception e) {

334

throw new RuntimeException("Operation failed", e);

335

} finally {

336

// Clean up thread-local state

337

ThreadLocalRetryLoop.clearRetryLoop();

338

}

339

}, executorService);

340

}

341

342

public void shutdown() {

343

executorService.shutdown();

344

try {

345

if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {

346

executorService.shutdownNow();

347

}

348

} catch (InterruptedException e) {

349

executorService.shutdownNow();

350

Thread.currentThread().interrupt();

351

}

352

}

353

}

354

355

// Usage example

356

MultiThreadedConnectionManager connectionManager = new MultiThreadedConnectionManager(

357

client, new StandardConnectionHandlingPolicy(), 10);

358

359

// Execute operations across threads

360

List<CompletableFuture<String>> futures = new ArrayList<>();

361

for (int i = 0; i < 100; i++) {

362

final int index = i;

363

CompletableFuture<String> future = connectionManager.executeAsync(() -> {

364

return new String(client.getZooKeeper().getData("/data/" + index, false, null));

365

});

366

futures.add(future);

367

}

368

369

// Wait for all operations to complete

370

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))

371

.thenRun(() -> System.out.println("All operations completed"))

372

.join();

373

```

374

375

### Connection Health Monitoring

376

377

```java

378

/**

379

* Connection health monitor using connection handling policies

380

*/

381

public class ConnectionHealthMonitor {

382

private final ConnectionHandlingPolicy policy;

383

private final ScheduledExecutorService scheduler;

384

private final AtomicBoolean isHealthy = new AtomicBoolean(true);

385

386

public ConnectionHealthMonitor(ConnectionHandlingPolicy policy) {

387

this.policy = policy;

388

this.scheduler = Executors.newScheduledThreadPool(1);

389

390

// Schedule periodic health checks

391

scheduler.scheduleAtFixedRate(this::checkHealth, 0, 30, TimeUnit.SECONDS);

392

}

393

394

private void checkHealth() {

395

try {

396

CheckTimeoutsResult result = policy.checkTimeouts();

397

boolean healthy = !result.isTimedOut();

398

399

if (healthy != isHealthy.get()) {

400

isHealthy.set(healthy);

401

if (healthy) {

402

System.out.println("Connection health restored");

403

} else {

404

System.err.println("Connection health degraded: " + result.getMessage());

405

}

406

}

407

} catch (Exception e) {

408

isHealthy.set(false);

409

System.err.println("Health check failed: " + e.getMessage());

410

}

411

}

412

413

public boolean isHealthy() {

414

return isHealthy.get();

415

}

416

417

public void shutdown() {

418

scheduler.shutdown();

419

}

420

}

421

```

422

423

## Connection Handling Best Practices

424

425

### Policy Selection Guidelines

426

427

**StandardConnectionHandlingPolicy**:

428

- Use for most production applications

429

- Provides reliable timeout handling and retry behavior

430

- Good balance of performance and reliability

431

432

**Custom Policies**:

433

- Implement for specialized requirements (circuit breakers, adaptive timeouts)

434

- Consider maintenance overhead and complexity

435

- Test thoroughly under failure conditions

436

437

### Thread Safety Considerations

438

439

```java

440

// Always clean up thread-local state in multi-threaded applications

441

try {

442

RetryLoop retryLoop = ThreadLocalRetryLoop.getRetryLoop(client);

443

// ... use retry loop ...

444

} finally {

445

ThreadLocalRetryLoop.clearRetryLoop(); // Prevent memory leaks

446

}

447

448

// Use connection policies that are thread-safe

449

ConnectionHandlingPolicy threadSafePolicy = new StandardConnectionHandlingPolicy();

450

// Multiple threads can safely use the same policy instance

451

```

452

453

### Error Handling Patterns

454

455

```java

456

// Comprehensive error handling with connection policies

457

try {

458

T result = connectionPolicy.callWithRetry(client, () -> {

459

// ZooKeeper operation

460

return client.getZooKeeper().getData(path, false, null);

461

});

462

return result;

463

} catch (Exception e) {

464

// Check if failure was due to connection handling policy

465

try {

466

CheckTimeoutsResult timeoutResult = connectionPolicy.checkTimeouts();

467

if (timeoutResult.isTimedOut()) {

468

// Handle timeout-specific failure

469

handleTimeoutFailure(timeoutResult);

470

} else {

471

// Handle other types of failures

472

handleGeneralFailure(e);

473

}

474

} catch (Exception timeoutCheckException) {

475

// Handle failure to check timeouts

476

handleCriticalFailure(timeoutCheckException);

477

}

478

}

479

```