or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcore-rpc.mdindex.mdplugins.mdprotocols.mdstats.mdtransports.md

async.mddocs/

0

# Asynchronous Operations

1

2

Apache Avro IPC provides comprehensive support for non-blocking RPC operations through callback-based and Future-based patterns, enabling high-performance concurrent applications.

3

4

## Capabilities

5

6

### Callback Interface

7

8

The fundamental interface for asynchronous operations, providing success and error handling methods.

9

10

```java { .api }

11

public interface Callback<T> {

12

// Handle successful result

13

void handleResult(T result);

14

15

// Handle error condition

16

void handleError(Throwable error);

17

}

18

```

19

20

#### Usage Examples

21

22

```java

23

// Simple callback implementation

24

Callback<String> callback = new Callback<String>() {

25

@Override

26

public void handleResult(String result) {

27

System.out.println("RPC completed successfully: " + result);

28

// Process result on callback thread

29

}

30

31

@Override

32

public void handleError(Throwable error) {

33

System.err.println("RPC failed: " + error.getMessage());

34

// Handle error condition

35

}

36

};

37

38

// Make asynchronous RPC call

39

requestor.request("getData", requestParams, callback);

40

41

// Lambda-based callback (Java 8+)

42

requestor.request("processData", data, new Callback<ProcessingResult>() {

43

@Override

44

public void handleResult(ProcessingResult result) {

45

result.getItems().forEach(item -> processItem(item));

46

}

47

48

@Override

49

public void handleError(Throwable error) {

50

logger.error("Processing failed", error);

51

scheduleRetry();

52

}

53

});

54

```

55

56

### Future-Based Operations

57

58

`CallFuture` provides a Future implementation that also acts as a Callback, enabling both blocking and non-blocking usage patterns.

59

60

```java { .api }

61

public class CallFuture<T> implements Future<T>, Callback<T> {

62

// Constructors

63

public CallFuture();

64

public CallFuture(Callback<T> chainedCallback);

65

66

// Result access methods

67

public T getResult() throws Exception;

68

public Throwable getError();

69

70

// Blocking wait methods

71

public void await() throws InterruptedException;

72

public void await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;

73

74

// Future interface methods

75

public boolean cancel(boolean mayInterruptIfRunning);

76

public boolean isCancelled();

77

public boolean isDone();

78

public T get() throws InterruptedException, ExecutionException;

79

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

80

81

// Callback interface methods

82

public void handleResult(T result);

83

public void handleError(Throwable error);

84

}

85

```

86

87

#### Usage Examples

88

89

```java

90

// Basic Future usage

91

CallFuture<String> future = new CallFuture<>();

92

requestor.request("getData", request, future);

93

94

try {

95

// Block until result is available

96

String result = future.get();

97

System.out.println("Result: " + result);

98

} catch (ExecutionException e) {

99

System.err.println("RPC failed: " + e.getCause().getMessage());

100

}

101

102

// Future with timeout

103

CallFuture<ProcessingResult> future = new CallFuture<>();

104

requestor.request("longRunningTask", params, future);

105

106

try {

107

ProcessingResult result = future.get(30, TimeUnit.SECONDS);

108

System.out.println("Task completed: " + result.getStatus());

109

} catch (TimeoutException e) {

110

System.err.println("Task timed out");

111

future.cancel(true);

112

}

113

114

// Chained callback with Future

115

CallFuture<String> future = new CallFuture<>(new Callback<String>() {

116

@Override

117

public void handleResult(String result) {

118

// This callback is invoked in addition to Future completion

119

notifyListeners(result);

120

}

121

122

@Override

123

public void handleError(Throwable error) {

124

logError(error);

125

}

126

});

127

128

requestor.request("getData", request, future);

129

130

// Can still use Future methods

131

if (future.isDone()) {

132

String result = future.getResult(); // Non-blocking if done

133

}

134

```

135

136

### Asynchronous Requestor Operations

137

138

All requestor implementations support asynchronous operations through the callback-based request method.

139

140

```java { .api }

141

// From Requestor base class

142

public abstract class Requestor {

143

// Asynchronous RPC call

144

public <T> void request(String messageName, Object request, Callback<T> callback) throws Exception;

145

146

// Synchronous RPC call (for comparison)

147

public Object request(String messageName, Object request) throws Exception;

148

}

149

```

150

151

#### Usage Examples

152

153

```java

154

// Generic requestor async usage

155

GenericRequestor requestor = new GenericRequestor(protocol, transceiver);

156

GenericData.Record request = createRequest();

157

158

requestor.request("processData", request, new Callback<Object>() {

159

@Override

160

public void handleResult(Object result) {

161

if (result instanceof GenericData.Record) {

162

GenericData.Record record = (GenericData.Record) result;

163

System.out.println("Status: " + record.get("status"));

164

}

165

}

166

167

@Override

168

public void handleError(Throwable error) {

169

System.err.println("Generic RPC failed: " + error.getMessage());

170

}

171

});

172

173

// Specific requestor async usage

174

SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);

175

176

requestor.request("getUserData", userId, new Callback<UserData>() {

177

@Override

178

public void handleResult(UserData userData) {

179

updateUserInterface(userData);

180

}

181

182

@Override

183

public void handleError(Throwable error) {

184

showErrorMessage("Failed to load user data: " + error.getMessage());

185

}

186

});

187

```

188

189

### Asynchronous Transport Operations

190

191

Transport implementations provide asynchronous communication at the transport layer.

192

193

```java { .api }

194

// From Transceiver base class

195

public abstract class Transceiver {

196

// Asynchronous transport operation

197

public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;

198

199

// Synchronous transport operation (for comparison)

200

public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;

201

}

202

```

203

204

#### Usage Examples

205

206

```java

207

// Direct transport-level async usage

208

HttpTransceiver transceiver = new HttpTransceiver(serverUrl);

209

List<ByteBuffer> requestBuffers = serializeRequest(request);

210

211

transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {

212

@Override

213

public void handleResult(List<ByteBuffer> responseBuffers) {

214

Object response = deserializeResponse(responseBuffers);

215

handleResponse(response);

216

}

217

218

@Override

219

public void handleError(Throwable error) {

220

System.err.println("Transport error: " + error.getMessage());

221

}

222

});

223

```

224

225

## Advanced Asynchronous Patterns

226

227

### Concurrent RPC Calls

228

229

Execute multiple RPC calls concurrently and wait for all to complete:

230

231

```java

232

public class ConcurrentRPCExample {

233

public void fetchMultipleDataSources() {

234

List<CallFuture<String>> futures = new ArrayList<>();

235

236

// Start multiple async calls

237

for (String dataSource : dataSources) {

238

CallFuture<String> future = new CallFuture<>();

239

requestor.request("fetchData", dataSource, future);

240

futures.add(future);

241

}

242

243

// Wait for all to complete

244

List<String> results = new ArrayList<>();

245

for (CallFuture<String> future : futures) {

246

try {

247

results.add(future.get(10, TimeUnit.SECONDS));

248

} catch (Exception e) {

249

System.err.println("Failed to fetch data: " + e.getMessage());

250

}

251

}

252

253

processResults(results);

254

}

255

}

256

```

257

258

### Callback Chaining

259

260

Chain multiple asynchronous operations:

261

262

```java

263

public class CallbackChaining {

264

public void processUserWorkflow(long userId) {

265

// Step 1: Get user data

266

requestor.request("getUser", userId, new Callback<UserData>() {

267

@Override

268

public void handleResult(UserData user) {

269

// Step 2: Get user preferences

270

requestor.request("getPreferences", user.getId(), new Callback<Preferences>() {

271

@Override

272

public void handleResult(Preferences prefs) {

273

// Step 3: Customize content

274

requestor.request("customizeContent",

275

new CustomizationRequest(user, prefs),

276

new Callback<Content>() {

277

@Override

278

public void handleResult(Content content) {

279

displayContent(content);

280

}

281

282

@Override

283

public void handleError(Throwable error) {

284

showDefaultContent();

285

}

286

});

287

}

288

289

@Override

290

public void handleError(Throwable error) {

291

// Use default preferences

292

showDefaultContent();

293

}

294

});

295

}

296

297

@Override

298

public void handleError(Throwable error) {

299

showError("Failed to load user: " + error.getMessage());

300

}

301

});

302

}

303

}

304

```

305

306

### Custom Future Implementation

307

308

Create specialized Future implementations for complex scenarios:

309

310

```java

311

public class TimeoutCallFuture<T> extends CallFuture<T> {

312

private final ScheduledExecutorService scheduler;

313

private ScheduledFuture<?> timeoutTask;

314

315

public TimeoutCallFuture(long timeout, TimeUnit unit) {

316

super();

317

this.scheduler = Executors.newScheduledThreadPool(1);

318

this.timeoutTask = scheduler.schedule(() -> {

319

if (!isDone()) {

320

handleError(new TimeoutException("RPC call timed out"));

321

}

322

}, timeout, unit);

323

}

324

325

@Override

326

public void handleResult(T result) {

327

timeoutTask.cancel(false);

328

scheduler.shutdown();

329

super.handleResult(result);

330

}

331

332

@Override

333

public void handleError(Throwable error) {

334

timeoutTask.cancel(false);

335

scheduler.shutdown();

336

super.handleError(error);

337

}

338

}

339

340

// Usage

341

TimeoutCallFuture<String> future = new TimeoutCallFuture<>(5, TimeUnit.SECONDS);

342

requestor.request("slowOperation", request, future);

343

```

344

345

### Error Recovery Patterns

346

347

Implement retry logic with exponential backoff:

348

349

```java

350

public class RetryCallback<T> implements Callback<T> {

351

private final Requestor requestor;

352

private final String messageName;

353

private final Object request;

354

private final Callback<T> finalCallback;

355

private final int maxRetries;

356

private int currentTry = 0;

357

358

public RetryCallback(Requestor requestor, String messageName, Object request,

359

Callback<T> finalCallback, int maxRetries) {

360

this.requestor = requestor;

361

this.messageName = messageName;

362

this.request = request;

363

this.finalCallback = finalCallback;

364

this.maxRetries = maxRetries;

365

}

366

367

@Override

368

public void handleResult(T result) {

369

finalCallback.handleResult(result);

370

}

371

372

@Override

373

public void handleError(Throwable error) {

374

if (currentTry < maxRetries && isRetryable(error)) {

375

currentTry++;

376

long delay = Math.min(1000 * (1L << currentTry), 30000); // Exponential backoff

377

378

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

379

scheduler.schedule(() -> {

380

try {

381

requestor.request(messageName, request, this);

382

} catch (Exception e) {

383

finalCallback.handleError(e);

384

}

385

scheduler.shutdown();

386

}, delay, TimeUnit.MILLISECONDS);

387

} else {

388

finalCallback.handleError(error);

389

}

390

}

391

392

private boolean isRetryable(Throwable error) {

393

return error instanceof IOException ||

394

error instanceof SocketTimeoutException ||

395

error instanceof ConnectException;

396

}

397

}

398

399

// Usage

400

Callback<String> retryCallback = new RetryCallback<>(requestor, "getData", request,

401

new Callback<String>() {

402

@Override

403

public void handleResult(String result) {

404

System.out.println("Success: " + result);

405

}

406

407

@Override

408

public void handleError(Throwable error) {

409

System.err.println("Final failure: " + error.getMessage());

410

}

411

}, 3); // Max 3 retries

412

413

requestor.request("getData", request, retryCallback);

414

```

415

416

## Thread Safety and Concurrency

417

418

### Thread Safety Guarantees

419

420

- `CallFuture` instances are thread-safe for concurrent access

421

- Callback methods are invoked on I/O threads - avoid blocking operations

422

- Multiple concurrent async calls can be made with the same Requestor instance

423

- Transport implementations handle concurrent async operations safely

424

425

### Best Practices

426

427

```java

428

// Good: Non-blocking callback processing

429

requestor.request("getData", request, new Callback<String>() {

430

@Override

431

public void handleResult(String result) {

432

// Quick processing on callback thread

433

resultQueue.offer(result);

434

resultProcessor.signal(); // Wake up processing thread

435

}

436

437

@Override

438

public void handleError(Throwable error) {

439

errorLogger.logAsync(error); // Non-blocking logging

440

}

441

});

442

443

// Bad: Blocking operations in callback

444

requestor.request("getData", request, new Callback<String>() {

445

@Override

446

public void handleResult(String result) {

447

// This blocks the I/O thread!

448

database.saveResult(result); // Blocking database call

449

Thread.sleep(1000); // Very bad!

450

}

451

452

@Override

453

public void handleError(Throwable error) {

454

// Also bad - blocking I/O

455

System.out.println("Error: " + error.getMessage());

456

}

457

});

458

```

459

460

## Performance Considerations

461

462

### Async vs Sync Performance

463

464

- Asynchronous calls provide better throughput for concurrent operations

465

- Reduced thread usage compared to synchronous calls with thread pools

466

- Lower memory overhead per outstanding request

467

- Better resource utilization in high-concurrency scenarios

468

469

### Optimization Tips

470

471

```java

472

// Reuse CallFuture instances when possible

473

private final Queue<CallFuture<String>> futurePool = new ConcurrentLinkedQueue<>();

474

475

public CallFuture<String> getFuture() {

476

CallFuture<String> future = futurePool.poll();

477

return future != null ? future : new CallFuture<>();

478

}

479

480

public void returnFuture(CallFuture<String> future) {

481

if (future.isDone()) {

482

future.reset(); // Hypothetical reset method

483

futurePool.offer(future);

484

}

485

}

486

487

// Use appropriate timeout values

488

CallFuture<String> future = new CallFuture<>();

489

requestor.request("quickOperation", request, future);

490

String result = future.get(100, TimeUnit.MILLISECONDS); // Short timeout for quick ops

491

492

// Batch related operations

493

List<CallFuture<String>> batch = new ArrayList<>();

494

for (String item : items) {

495

CallFuture<String> future = new CallFuture<>();

496

requestor.request("processItem", item, future);

497

batch.add(future);

498

}

499

// Process results in batch

500

```