or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcore-rpc.mdindex.mdplugins.mdprotocols.mdstats.mdtransports.md

core-rpc.mddocs/

0

# Core RPC Framework

1

2

The foundation of Apache Avro IPC consists of abstract base classes that provide the client-server RPC communication infrastructure. These classes define the contract for RPC operations while remaining transport and protocol agnostic.

3

4

## Capabilities

5

6

### Client-Side RPC Base Class

7

8

The `Requestor` class provides the client-side foundation for RPC communication, handling protocol negotiation, request serialization, and response deserialization.

9

10

```java { .api }

11

public abstract class Requestor {

12

protected Requestor(Protocol local, Transceiver transceiver) throws IOException;

13

14

// Protocol and transport access

15

public Protocol getLocal();

16

public Transceiver getTransceiver();

17

public Protocol getRemote() throws IOException;

18

19

// Plugin management

20

public void addRPCPlugin(RPCPlugin plugin);

21

22

// Synchronous RPC call

23

public Object request(String messageName, Object request) throws Exception;

24

25

// Asynchronous RPC call with callback

26

public <T> void request(String messageName, Object request, Callback<T> callback)

27

throws AvroRemoteException, IOException;

28

29

// Abstract methods for protocol-specific implementations

30

public abstract void writeRequest(Schema schema, Object request, Encoder out) throws IOException;

31

public abstract Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException;

32

public abstract Exception readError(Schema writer, Schema reader, Decoder in) throws IOException;

33

}

34

```

35

36

#### Usage Examples

37

38

```java

39

// Synchronous RPC call

40

MyRequestor requestor = new MyRequestor(protocol, transceiver);

41

try {

42

Object result = requestor.request("getMessage", requestData);

43

// Handle result

44

} catch (Exception e) {

45

// Handle RPC error

46

}

47

48

// Asynchronous RPC call

49

requestor.request("getMessage", requestData, new Callback<String>() {

50

@Override

51

public void handleResult(String result) {

52

// Handle successful result

53

}

54

55

@Override

56

public void handleError(Throwable error) {

57

// Handle error

58

}

59

});

60

```

61

62

### Server-Side RPC Base Class

63

64

The `Responder` class provides the server-side foundation for RPC communication, handling request deserialization, method dispatch, and response serialization.

65

66

```java { .api }

67

public abstract class Responder {

68

protected Responder(Protocol local);

69

70

// Protocol access

71

public static Protocol getRemote(); // ThreadLocal access to remote protocol

72

public Protocol getLocal();

73

74

// Plugin management

75

public void addRPCPlugin(RPCPlugin plugin);

76

77

// Process RPC request buffers

78

public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException;

79

public List<ByteBuffer> respond(List<ByteBuffer> buffers, Transceiver connection) throws IOException;

80

81

// Abstract methods for protocol-specific implementations

82

public abstract Object respond(Message message, Object request) throws Exception;

83

public abstract Object readRequest(Schema actual, Schema expected, Decoder in) throws IOException;

84

public abstract void writeResponse(Schema schema, Object response, Encoder out) throws IOException;

85

public abstract void writeError(Schema schema, Object error, Encoder out) throws IOException;

86

}

87

```

88

89

#### Usage Examples

90

91

```java

92

// Custom responder implementation

93

public class MyResponder extends GenericResponder {

94

public MyResponder(Protocol protocol) {

95

super(protocol);

96

}

97

98

@Override

99

public Object respond(Message message, Object request) throws Exception {

100

String messageName = message.getName();

101

switch (messageName) {

102

case "getMessage":

103

return handleGetMessage(request);

104

case "setMessage":

105

return handleSetMessage(request);

106

default:

107

throw new AvroRuntimeException("Unknown message: " + messageName);

108

}

109

}

110

111

private Object handleGetMessage(Object request) {

112

// Implementation logic

113

return "Hello from server";

114

}

115

116

private Object handleSetMessage(Object request) {

117

// Implementation logic

118

return null; // void method

119

}

120

}

121

```

122

123

### Transport Abstraction

124

125

The `Transceiver` class provides the transport abstraction layer, handling network I/O operations while remaining protocol agnostic.

126

127

```java { .api }

128

public abstract class Transceiver implements Closeable {

129

// Connection information

130

public abstract String getRemoteName() throws IOException;

131

public boolean isConnected();

132

133

// Protocol management

134

public void setRemote(Protocol protocol);

135

public Protocol getRemote();

136

137

// Channel synchronization

138

public void lockChannel();

139

public void unlockChannel();

140

141

// Synchronous communication

142

public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;

143

144

// Asynchronous communication

145

public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;

146

147

// Abstract I/O methods

148

public abstract List<ByteBuffer> readBuffers() throws IOException;

149

public abstract void writeBuffers(List<ByteBuffer> buffers) throws IOException;

150

151

// Resource management

152

public void close() throws IOException;

153

}

154

```

155

156

#### Usage Examples

157

158

```java

159

// Using transceiver directly for low-level communication

160

List<ByteBuffer> requestBuffers = serializeRequest(request);

161

try {

162

transceiver.lockChannel();

163

List<ByteBuffer> responseBuffers = transceiver.transceive(requestBuffers);

164

Object response = deserializeResponse(responseBuffers);

165

} finally {

166

transceiver.unlockChannel();

167

}

168

169

// Asynchronous transceiver usage

170

transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {

171

@Override

172

public void handleResult(List<ByteBuffer> responseBuffers) {

173

Object response = deserializeResponse(responseBuffers);

174

// Handle response

175

}

176

177

@Override

178

public void handleError(Throwable error) {

179

// Handle communication error

180

}

181

});

182

```

183

184

### Server Interface

185

186

The `Server` interface defines the contract for server lifecycle management, providing standard start/stop semantics.

187

188

```java { .api }

189

public interface Server {

190

// Server information

191

int getPort();

192

193

// Lifecycle management

194

void start();

195

void close();

196

void join() throws InterruptedException;

197

}

198

```

199

200

#### Usage Examples

201

202

```java

203

// Server lifecycle management

204

Server server = new SaslSocketServer(responder, new InetSocketAddress(8080));

205

206

// Start server

207

server.start();

208

System.out.println("Server started on port: " + server.getPort());

209

210

// Wait for server to finish (in separate thread or shutdown hook)

211

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

212

server.close();

213

try {

214

server.join();

215

} catch (InterruptedException e) {

216

Thread.currentThread().interrupt();

217

}

218

}));

219

220

// Wait for server completion

221

server.join();

222

```

223

224

## Advanced Usage

225

226

### Custom Requestor Implementation

227

228

```java

229

public class CustomRequestor extends Requestor {

230

private final SpecificDatumWriter<Object> writer;

231

private final SpecificDatumReader<Object> reader;

232

233

public CustomRequestor(Protocol protocol, Transceiver transceiver) throws IOException {

234

super(protocol, transceiver);

235

this.writer = new SpecificDatumWriter<>(protocol);

236

this.reader = new SpecificDatumReader<>(protocol);

237

}

238

239

@Override

240

public void writeRequest(Schema schema, Object request, Encoder out) throws IOException {

241

writer.setSchema(schema);

242

writer.write(request, out);

243

}

244

245

@Override

246

public Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException {

247

this.reader.setSchema(writer);

248

this.reader.setExpected(reader);

249

return this.reader.read(null, in);

250

}

251

252

@Override

253

public Exception readError(Schema writer, Schema reader, Decoder in) throws IOException {

254

this.reader.setSchema(writer);

255

this.reader.setExpected(reader);

256

Object error = this.reader.read(null, in);

257

if (error instanceof Exception) {

258

return (Exception) error;

259

}

260

return new AvroRemoteException(error);

261

}

262

}

263

```

264

265

### Plugin Integration

266

267

```java

268

// Add monitoring plugin to requestor and responder

269

RPCPlugin monitoringPlugin = new RPCPlugin() {

270

@Override

271

public void clientSendRequest(RPCContext context) {

272

System.out.println("Sending request: " + context.getMessage().getName());

273

}

274

275

@Override

276

public void clientReceiveResponse(RPCContext context) {

277

System.out.println("Received response, error: " + context.isError());

278

}

279

};

280

281

requestor.addRPCPlugin(monitoringPlugin);

282

responder.addRPCPlugin(monitoringPlugin);

283

```

284

285

## Error Handling

286

287

The core RPC framework provides comprehensive error handling through exceptions and the RPC context:

288

289

```java

290

try {

291

Object result = requestor.request("methodName", request);

292

} catch (AvroRemoteException e) {

293

// Remote method threw an exception

294

System.err.println("Remote error: " + e.getMessage());

295

} catch (IOException e) {

296

// Network or serialization error

297

System.err.println("Communication error: " + e.getMessage());

298

} catch (Exception e) {

299

// Other RPC-related errors

300

System.err.println("RPC error: " + e.getMessage());

301

}

302

```

303

304

## Thread Safety

305

306

- `Requestor` instances are thread-safe for concurrent RPC calls

307

- `Responder` instances are thread-safe for concurrent request processing

308

- `Transceiver` instances provide channel locking for thread-safe access

309

- Each RPC call gets its own `RPCContext` for thread-local state management