or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-utilities.mdindex.mdmessage-protocol.mdsasl-authentication.mdserver-operations.mdtransport-setup.md

client-operations.mddocs/

0

# Client Operations

1

2

Client-side networking functionality for establishing connections, sending RPC requests, and fetching data chunks from remote servers. The client API provides both synchronous and asynchronous operations for different use cases.

3

4

## Capabilities

5

6

### TransportClient

7

8

Main client class for network operations including RPC calls, chunk fetching, and streaming.

9

10

```java { .api }

11

/**

12

* TransportClient provides the client-side API for network communication.

13

* It supports RPC requests, chunk fetching, and streaming operations.

14

* All operations are thread-safe and can be called concurrently.

15

*/

16

public class TransportClient implements Closeable {

17

public TransportClient(Channel channel, TransportResponseHandler handler);

18

19

/** Gets the underlying Netty channel */

20

public Channel getChannel();

21

22

/** Checks if the client connection is active */

23

public boolean isActive();

24

25

/** Gets the remote socket address */

26

public SocketAddress getSocketAddress();

27

28

/** Gets the client identifier */

29

public String getClientId();

30

31

/** Sets the client identifier */

32

public void setClientId(String id);

33

34

/**

35

* Fetches a specific chunk from a stream asynchronously.

36

*

37

* @param streamId The stream identifier

38

* @param chunkIndex The index of the chunk to fetch

39

* @param callback Callback to handle the chunk data or failure

40

*/

41

public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);

42

43

/**

44

* Opens a stream for continuous data transfer.

45

*

46

* @param streamId The stream identifier

47

* @param callback Callback to handle stream data, completion, or failure

48

*/

49

public void stream(String streamId, StreamCallback callback);

50

51

/**

52

* Sends an RPC request asynchronously.

53

*

54

* @param message The request message as ByteBuffer

55

* @param callback Callback to handle the response or failure

56

* @return Request ID that can be used to cancel the request

57

*/

58

public long sendRpc(ByteBuffer message, RpcResponseCallback callback);

59

60

/**

61

* Sends an RPC request synchronously with timeout.

62

*

63

* @param message The request message as ByteBuffer

64

* @param timeoutMs Timeout in milliseconds

65

* @return The response message as ByteBuffer

66

* @throws RuntimeException if the request fails or times out

67

*/

68

public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);

69

70

/**

71

* Sends a one-way message (no response expected).

72

*

73

* @param message The message as ByteBuffer

74

*/

75

public void send(ByteBuffer message);

76

77

/**

78

* Removes a pending RPC request (cancels it).

79

*

80

* @param requestId The request ID returned by sendRpc

81

*/

82

public void removeRpcRequest(long requestId);

83

84

/** Forces a timeout on all pending requests */

85

public void timeOut();

86

87

/** Closes the client connection and releases resources */

88

public void close();

89

90

public String toString();

91

}

92

```

93

94

### TransportClientFactory

95

96

Factory for creating and managing TransportClient instances.

97

98

```java { .api }

99

/**

100

* Factory for creating TransportClient instances with connection pooling and management.

101

* Manages connection lifecycle and provides both managed and unmanaged clients.

102

*/

103

public class TransportClientFactory implements Closeable {

104

/**

105

* Creates a managed client connection to the specified host and port.

106

* Managed clients are pooled and reused for efficiency.

107

*

108

* @param remoteHost The remote host to connect to

109

* @param remotePort The remote port to connect to

110

* @return A TransportClient instance

111

* @throws IOException if connection fails

112

*/

113

public TransportClient createClient(String remoteHost, int remotePort) throws IOException;

114

115

/**

116

* Creates an unmanaged client connection to the specified host and port.

117

* Unmanaged clients are not pooled and must be explicitly closed.

118

*

119

* @param remoteHost The remote host to connect to

120

* @param remotePort The remote port to connect to

121

* @return A TransportClient instance

122

* @throws IOException if connection fails

123

*/

124

public TransportClient createUnmanagedClient(String remoteHost, int remotePort) throws IOException;

125

126

/** Closes the factory and all managed client connections */

127

public void close();

128

}

129

```

130

131

### Callback Interfaces

132

133

Callback interfaces for handling asynchronous operations.

134

135

```java { .api }

136

/**

137

* Callback interface for chunk fetch operations.

138

* Implementations handle successful chunk receipt or fetch failures.

139

*/

140

public interface ChunkReceivedCallback {

141

/**

142

* Called when a chunk is successfully received.

143

*

144

* @param chunkIndex The index of the received chunk

145

* @param buffer The chunk data as a ManagedBuffer

146

*/

147

void onSuccess(int chunkIndex, ManagedBuffer buffer);

148

149

/**

150

* Called when chunk fetching fails.

151

*

152

* @param chunkIndex The index of the chunk that failed

153

* @param e The exception that caused the failure

154

*/

155

void onFailure(int chunkIndex, Throwable e);

156

}

157

158

/**

159

* Callback interface for RPC response handling.

160

* Implementations handle successful responses or RPC failures.

161

*/

162

public interface RpcResponseCallback {

163

/**

164

* Called when an RPC response is successfully received.

165

*

166

* @param response The response message as ByteBuffer

167

*/

168

void onSuccess(ByteBuffer response);

169

170

/**

171

* Called when an RPC request fails.

172

*

173

* @param e The exception that caused the failure

174

*/

175

void onFailure(Throwable e);

176

}

177

178

/**

179

* Callback interface for stream operations.

180

* Implementations handle stream data, completion, and failures.

181

*/

182

public interface StreamCallback {

183

/**

184

* Called when stream data is received.

185

*

186

* @param streamId The stream identifier

187

* @param buf The stream data as ByteBuffer

188

*/

189

void onData(String streamId, ByteBuffer buf);

190

191

/**

192

* Called when the stream completes successfully.

193

*

194

* @param streamId The stream identifier

195

*/

196

void onComplete(String streamId);

197

198

/**

199

* Called when the stream fails.

200

*

201

* @param streamId The stream identifier

202

* @param cause The exception that caused the failure

203

*/

204

void onFailure(String streamId, Throwable cause);

205

}

206

```

207

208

## Usage Examples

209

210

### Basic RPC Operations

211

212

```java

213

import org.apache.spark.network.client.TransportClient;

214

import org.apache.spark.network.client.RpcResponseCallback;

215

import java.nio.ByteBuffer;

216

217

// Asynchronous RPC call

218

TransportClient client = clientFactory.createClient("server-host", 8080);

219

220

ByteBuffer request = ByteBuffer.wrap("Hello Server".getBytes());

221

222

client.sendRpc(request, new RpcResponseCallback() {

223

@Override

224

public void onSuccess(ByteBuffer response) {

225

String responseStr = new String(response.array());

226

System.out.println("Received response: " + responseStr);

227

}

228

229

@Override

230

public void onFailure(Throwable e) {

231

System.err.println("RPC failed: " + e.getMessage());

232

}

233

});

234

235

// Synchronous RPC call with timeout

236

try {

237

ByteBuffer response = client.sendRpcSync(request, 5000); // 5 second timeout

238

String responseStr = new String(response.array());

239

System.out.println("Sync response: " + responseStr);

240

} catch (Exception e) {

241

System.err.println("Sync RPC failed: " + e.getMessage());

242

}

243

```

244

245

### Chunk Fetching

246

247

```java

248

import org.apache.spark.network.client.ChunkReceivedCallback;

249

import org.apache.spark.network.buffer.ManagedBuffer;

250

251

// Fetch specific chunks from a stream

252

long streamId = 12345L;

253

254

for (int chunkIndex = 0; chunkIndex < 10; chunkIndex++) {

255

final int currentChunk = chunkIndex;

256

257

client.fetchChunk(streamId, chunkIndex, new ChunkReceivedCallback() {

258

@Override

259

public void onSuccess(int chunkIndex, ManagedBuffer buffer) {

260

System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());

261

262

// Process the chunk data

263

try {

264

ByteBuffer data = buffer.nioByteBuffer();

265

// Process data...

266

} catch (Exception e) {

267

System.err.println("Failed to process chunk: " + e.getMessage());

268

} finally {

269

buffer.release(); // Important: release buffer when done

270

}

271

}

272

273

@Override

274

public void onFailure(int chunkIndex, Throwable e) {

275

System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());

276

}

277

});

278

}

279

```

280

281

### Stream Operations

282

283

```java

284

import org.apache.spark.network.client.StreamCallback;

285

286

// Open a stream for continuous data transfer

287

String streamId = "data-stream-001";

288

289

client.stream(streamId, new StreamCallback() {

290

@Override

291

public void onData(String streamId, ByteBuffer buf) {

292

System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);

293

294

// Process stream data

295

byte[] data = new byte[buf.remaining()];

296

buf.get(data);

297

// Process data...

298

}

299

300

@Override

301

public void onComplete(String streamId) {

302

System.out.println("Stream " + streamId + " completed successfully");

303

}

304

305

@Override

306

public void onFailure(String streamId, Throwable cause) {

307

System.err.println("Stream " + streamId + " failed: " + cause.getMessage());

308

}

309

});

310

```

311

312

### Connection Management

313

314

```java

315

// Create managed clients (recommended for most use cases)

316

TransportClient managedClient = clientFactory.createClient("server1", 8080);

317

318

// Client is automatically managed by the factory

319

// No need to explicitly close (factory handles cleanup)

320

321

// Create unmanaged clients (for special cases)

322

TransportClient unmanagedClient = clientFactory.createUnmanagedClient("server2", 9090);

323

324

// Important: Must explicitly close unmanaged clients

325

try {

326

// Use client...

327

} finally {

328

unmanagedClient.close();

329

}

330

331

// Check client status

332

if (client.isActive()) {

333

System.out.println("Client connected to: " + client.getSocketAddress());

334

System.out.println("Client ID: " + client.getClientId());

335

} else {

336

System.out.println("Client connection is not active");

337

}

338

```

339

340

### Error Handling and Request Cancellation

341

342

```java

343

// Send RPC with ability to cancel

344

long requestId = client.sendRpc(request, new RpcResponseCallback() {

345

@Override

346

public void onSuccess(ByteBuffer response) {

347

// Handle success

348

}

349

350

@Override

351

public void onFailure(Throwable e) {

352

if (e instanceof TimeoutException) {

353

System.err.println("Request timed out");

354

} else {

355

System.err.println("Request failed: " + e.getMessage());

356

}

357

}

358

});

359

360

// Cancel the request if needed

361

if (shouldCancel) {

362

client.removeRpcRequest(requestId);

363

}

364

365

// Force timeout all pending requests (emergency cleanup)

366

client.timeOut();

367

```

368

369

### One-Way Messages

370

371

```java

372

// Send fire-and-forget messages

373

ByteBuffer notification = ByteBuffer.wrap("Server notification".getBytes());

374

375

client.send(notification); // No response expected or callback needed

376

377

// Useful for notifications, heartbeats, or status updates

378

```