or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcore.mdindex.mdmetadata.mdprotocols.mdschemes.mdservers.mdtransports.mdutilities.md

async.mddocs/

0

# Asynchronous Operations

1

2

Non-blocking client and server implementations for high-performance applications. The async framework enables handling many concurrent operations without blocking threads, making it ideal for high-throughput scenarios.

3

4

## Capabilities

5

6

### Async Client Framework

7

8

Base classes and interfaces for asynchronous client implementations.

9

10

```java { .api }

11

/**

12

* Abstract base class for asynchronous client implementations

13

*/

14

public abstract class TAsyncClient {

15

/** Create async client with protocol factory, manager, and transport */

16

public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport);

17

18

/** Create async client with timeout */

19

public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout);

20

21

/** Get the protocol factory used by this client */

22

public TProtocolFactory getProtocolFactory();

23

24

/** Get the current timeout value in milliseconds */

25

public long getTimeout();

26

27

/** Set timeout for operations in milliseconds */

28

public void setTimeout(long timeout);

29

30

/** Check if client has an error */

31

public boolean hasError();

32

33

/** Get the current error (if any) */

34

public Exception getError();

35

36

/** Check if last operation timed out */

37

public boolean hasTimeout();

38

39

/** Called when async operation completes successfully */

40

protected void onComplete();

41

42

/** Called when async operation encounters an error */

43

protected void onError(Exception exception);

44

}

45

```

46

47

### Async Client Manager

48

49

Manager for handling asynchronous client operations and selector threads.

50

51

```java { .api }

52

/**

53

* Manages asynchronous client connections and operations

54

*/

55

public class TAsyncClientManager {

56

/** Create client manager with default settings */

57

public TAsyncClientManager() throws IOException;

58

59

/** Create client manager with custom configuration */

60

public TAsyncClientManager(int selectThreadCount, int selectorThreadPoolSize, long timeoutCheckInterval) throws IOException;

61

62

/** Execute an asynchronous method call */

63

public void call(TAsyncMethodCall method) throws TException;

64

65

/** Stop the client manager and all selector threads */

66

public void stop();

67

68

/** Check if client manager is stopped */

69

public boolean isStopped();

70

}

71

```

72

73

### Async Method Calls

74

75

Classes representing asynchronous method invocations.

76

77

```java { .api }

78

/**

79

* Abstract base class representing an asynchronous method call

80

*/

81

public abstract class TAsyncMethodCall<T> {

82

/** Start the async method call */

83

public void start(Selector sel) throws IOException;

84

85

/** Check if the method call has finished */

86

public boolean isFinished();

87

88

/** Get the result of the method call (blocks until complete) */

89

public T getResult() throws Exception;

90

91

/** Check if method call has an error */

92

public boolean hasError();

93

94

/** Get the error (if any) */

95

public Exception getError();

96

97

/** Check if method call timed out */

98

public boolean hasTimeout();

99

100

/** Get the client that made this call */

101

public TAsyncClient getClient();

102

103

/** Get the transport for this call */

104

protected TNonblockingTransport getTransport();

105

106

/** Called when operation completes successfully */

107

protected abstract void onComplete();

108

109

/** Called when operation encounters an error */

110

protected abstract void onError(Exception exception);

111

112

/** Clean up resources and fire callback */

113

protected void cleanUpAndFireCallback(SelectionKey key);

114

}

115

```

116

117

### Async Callbacks

118

119

Callback interfaces for handling asynchronous operation results.

120

121

```java { .api }

122

/**

123

* Callback interface for asynchronous method calls

124

* @param <T> The type of the result

125

*/

126

public interface AsyncMethodCallback<T> {

127

/** Called when the asynchronous operation completes successfully */

128

public void onComplete(T response);

129

130

/** Called when the asynchronous operation encounters an error */

131

public void onError(Exception exception);

132

}

133

```

134

135

**Usage Examples:**

136

137

```java

138

import org.apache.thrift.async.AsyncMethodCallback;

139

import org.apache.thrift.async.TAsyncClientManager;

140

import org.apache.thrift.transport.TNonblockingSocket;

141

import org.apache.thrift.protocol.TBinaryProtocol;

142

143

// Create async client manager

144

TAsyncClientManager clientManager = new TAsyncClientManager();

145

146

// Create non-blocking transport

147

TNonblockingSocket transport = new TNonblockingSocket("localhost", 9090);

148

149

// Create async client

150

MyService.AsyncClient asyncClient = new MyService.AsyncClient(

151

new TBinaryProtocol.Factory(),

152

clientManager,

153

transport

154

);

155

156

// Define callback for async method

157

AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {

158

public void onComplete(String response) {

159

System.out.println("Received response: " + response);

160

}

161

162

public void onError(Exception exception) {

163

System.err.println("Error occurred: " + exception.getMessage());

164

}

165

};

166

167

// Make async method call

168

asyncClient.myAsyncMethod("parameter", callback);

169

170

// Continue with other work while call executes...

171

172

// Cleanup when done

173

clientManager.stop();

174

```

175

176

### Async Server Framework

177

178

Base classes for asynchronous server-side processing.

179

180

```java { .api }

181

/**

182

* Interface for asynchronous request processing

183

*/

184

public interface TAsyncProcessor {

185

/** Process an asynchronous request */

186

public void process(AsyncProcessFunction<?, ?, ?> function, TProtocol iproto, TProtocol oproto, AsyncMethodCallback callback) throws TException;

187

}

188

189

/**

190

* Base implementation for asynchronous processors

191

*/

192

public class TBaseAsyncProcessor implements TAsyncProcessor {

193

/** Create base async processor */

194

public TBaseAsyncProcessor();

195

196

/** Process async request using registered process functions */

197

public void process(AsyncProcessFunction<?, ?, ?> function, TProtocol iproto, TProtocol oproto, AsyncMethodCallback callback) throws TException;

198

199

/** Register an async process function for a method name */

200

protected void registerProcessor(String methodName, AsyncProcessFunction<?, ?, ?> fn);

201

202

/** Get registered process function by name */

203

protected AsyncProcessFunction<?, ?, ?> getProcessFunction(String methodName);

204

}

205

```

206

207

### Async Process Functions

208

209

Base classes for implementing asynchronous service methods.

210

211

```java { .api }

212

/**

213

* Abstract base class for asynchronous processing functions

214

* @param <I> The service interface type

215

* @param <T> The arguments type

216

* @param <R> The result type

217

*/

218

public abstract class AsyncProcessFunction<I, T extends TBase, R> {

219

/** Create async process function with method name */

220

public AsyncProcessFunction(String methodName);

221

222

/** Get the method name */

223

public String getMethodName();

224

225

/** Start processing the async request */

226

public abstract void start(I iface, long seqid, TProtocol iprot, TProtocol oprot, AsyncMethodCallback resultHandler) throws TException;

227

228

/** Check if method is oneway (no response expected) */

229

public abstract boolean isOneway();

230

231

/** Create and read the arguments from protocol */

232

protected abstract T getEmptyArgsInstance();

233

234

/** Create and read the result from protocol */

235

protected abstract TBase getEmptyResultInstance();

236

}

237

```

238

239

**Usage Examples for Async Server:**

240

241

```java

242

import org.apache.thrift.async.AsyncMethodCallback;

243

import org.apache.thrift.async.AsyncProcessFunction;

244

import org.apache.thrift.protocol.TProtocol;

245

246

// Example async service implementation

247

public class MyAsyncServiceHandler implements MyService.AsyncIface {

248

249

public void myAsyncMethod(String param, AsyncMethodCallback<String> callback) {

250

// Simulate async processing (e.g., database call, external service)

251

CompletableFuture.supplyAsync(() -> {

252

try {

253

// Do async work...

254

Thread.sleep(100); // Simulate work

255

return "Processed: " + param;

256

} catch (Exception e) {

257

throw new RuntimeException(e);

258

}

259

}).whenComplete((result, exception) -> {

260

if (exception != null) {

261

callback.onError(exception);

262

} else {

263

callback.onComplete(result);

264

}

265

});

266

}

267

}

268

269

// Custom async process function

270

public class MyAsyncProcessFunction extends AsyncProcessFunction<MyService.AsyncIface, MyMethod_args, String> {

271

272

public MyAsyncProcessFunction() {

273

super("myAsyncMethod");

274

}

275

276

public void start(MyService.AsyncIface iface, long seqid, TProtocol iprot, TProtocol oprot, AsyncMethodCallback<String> resultHandler) throws TException {

277

MyMethod_args args = new MyMethod_args();

278

args.read(iprot);

279

iprot.readMessageEnd();

280

281

// Create callback that writes response

282

AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {

283

public void onComplete(String response) {

284

try {

285

MyMethod_result result = new MyMethod_result();

286

result.success = response;

287

288

oprot.writeMessageBegin(new TMessage("myAsyncMethod", TMessageType.REPLY, seqid));

289

result.write(oprot);

290

oprot.writeMessageEnd();

291

oprot.getTransport().flush();

292

293

resultHandler.onComplete(response);

294

} catch (Exception e) {

295

onError(e);

296

}

297

}

298

299

public void onError(Exception exception) {

300

try {

301

TApplicationException appEx = new TApplicationException(TApplicationException.INTERNAL_ERROR, exception.getMessage());

302

oprot.writeMessageBegin(new TMessage("myAsyncMethod", TMessageType.EXCEPTION, seqid));

303

appEx.write(oprot);

304

oprot.writeMessageEnd();

305

oprot.getTransport().flush();

306

} catch (Exception e) {

307

// Log error

308

}

309

resultHandler.onError(exception);

310

}

311

};

312

313

// Call async method

314

iface.myAsyncMethod(args.param, callback);

315

}

316

317

public boolean isOneway() {

318

return false;

319

}

320

321

protected MyMethod_args getEmptyArgsInstance() {

322

return new MyMethod_args();

323

}

324

325

protected MyMethod_result getEmptyResultInstance() {

326

return new MyMethod_result();

327

}

328

}

329

```

330

331

### Async Client Example with Connection Pool

332

333

```java

334

import org.apache.thrift.async.TAsyncClientManager;

335

import org.apache.thrift.async.AsyncMethodCallback;

336

import org.apache.thrift.transport.TNonblockingSocket;

337

import org.apache.thrift.protocol.TBinaryProtocol;

338

import java.util.concurrent.CountDownLatch;

339

import java.util.concurrent.atomic.AtomicInteger;

340

341

public class AsyncClientPool {

342

private final TAsyncClientManager clientManager;

343

private final String host;

344

private final int port;

345

346

public AsyncClientPool(String host, int port) throws IOException {

347

this.clientManager = new TAsyncClientManager();

348

this.host = host;

349

this.port = port;

350

}

351

352

public void executeMultipleAsync(int numCalls) throws Exception {

353

CountDownLatch latch = new CountDownLatch(numCalls);

354

AtomicInteger successCount = new AtomicInteger(0);

355

AtomicInteger errorCount = new AtomicInteger(0);

356

357

for (int i = 0; i < numCalls; i++) {

358

// Create new transport for each call

359

TNonblockingSocket transport = new TNonblockingSocket(host, port);

360

361

// Create async client

362

MyService.AsyncClient client = new MyService.AsyncClient(

363

new TBinaryProtocol.Factory(),

364

clientManager,

365

transport

366

);

367

368

// Create callback

369

final int callId = i;

370

AsyncMethodCallback<String> callback = new AsyncMethodCallback<String>() {

371

public void onComplete(String response) {

372

System.out.println("Call " + callId + " completed: " + response);

373

successCount.incrementAndGet();

374

latch.countDown();

375

}

376

377

public void onError(Exception exception) {

378

System.err.println("Call " + callId + " failed: " + exception.getMessage());

379

errorCount.incrementAndGet();

380

latch.countDown();

381

}

382

};

383

384

// Make async call

385

client.myAsyncMethod("Request " + i, callback);

386

}

387

388

// Wait for all calls to complete

389

latch.await();

390

391

System.out.println("Completed: " + successCount.get() + " successful, " + errorCount.get() + " errors");

392

}

393

394

public void shutdown() {

395

clientManager.stop();

396

}

397

}

398

```

399

400

### Async Server Integration

401

402

```java

403

import org.apache.thrift.server.TNonblockingServer;

404

import org.apache.thrift.transport.TNonblockingServerSocket;

405

import org.apache.thrift.protocol.TBinaryProtocol;

406

407

public class AsyncServerExample {

408

public static void main(String[] args) throws Exception {

409

// Create async processor

410

MyService.AsyncProcessor<MyAsyncServiceHandler> processor =

411

new MyService.AsyncProcessor<>(new MyAsyncServiceHandler());

412

413

// Create non-blocking server transport

414

TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(9090);

415

416

// Create non-blocking server

417

TNonblockingServer server = new TNonblockingServer(

418

new TNonblockingServer.Args(serverTransport)

419

.processor(processor)

420

.protocolFactory(new TBinaryProtocol.Factory())

421

);

422

423

System.out.println("Starting async server on port 9090...");

424

server.serve();

425

}

426

}

427

```

428

429

### Error Handling in Async Operations

430

431

```java

432

import org.apache.thrift.async.AsyncMethodCallback;

433

import org.apache.thrift.TException;

434

435

// Robust async callback with error handling

436

public abstract class RobustAsyncCallback<T> implements AsyncMethodCallback<T> {

437

private final int maxRetries;

438

private int retryCount = 0;

439

440

public RobustAsyncCallback(int maxRetries) {

441

this.maxRetries = maxRetries;

442

}

443

444

public final void onError(Exception exception) {

445

if (shouldRetry(exception) && retryCount < maxRetries) {

446

retryCount++;

447

System.out.println("Retrying operation (attempt " + retryCount + "/" + maxRetries + ")");

448

retry();

449

} else {

450

handleFinalError(exception);

451

}

452

}

453

454

protected boolean shouldRetry(Exception exception) {

455

// Retry on timeout or connection errors, but not on application errors

456

return !(exception instanceof TApplicationException) && retryCount < maxRetries;

457

}

458

459

protected abstract void retry();

460

protected abstract void handleFinalError(Exception exception);

461

}

462

463

// Usage example

464

RobustAsyncCallback<String> robustCallback = new RobustAsyncCallback<String>(3) {

465

@Override

466

public void onComplete(String response) {

467

System.out.println("Success: " + response);

468

}

469

470

@Override

471

protected void retry() {

472

// Re-execute the async method call

473

client.myAsyncMethod("parameter", this);

474

}

475

476

@Override

477

protected void handleFinalError(Exception exception) {

478

System.err.println("Final failure after retries: " + exception.getMessage());

479

}

480

};

481

```