or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md

client-operations.mddocs/

0

# Client Operations

1

2

The client operations API provides high-performance networking capabilities for Spark's distributed communication needs. The `TransportClient` class is the main interface for client-side operations, offering thread-safe methods for chunk fetching, RPC communication, and streaming data transfer.

3

4

## Capabilities

5

6

### TransportClient

7

8

Main client class for network operations, providing thread-safe access to chunk fetching, RPC calls, and streaming functionality.

9

10

```java { .api }

11

/**

12

* Create a transport client for network communication

13

* @param channel - Netty channel for network communication

14

* @param handler - Response handler for managing responses and callbacks

15

*/

16

public TransportClient(Channel channel, TransportResponseHandler handler);

17

```

18

19

### Connection Management

20

21

Methods for managing client connections and retrieving connection information.

22

23

```java { .api }

24

/**

25

* Get the underlying Netty channel

26

* @return Channel instance used for network communication

27

*/

28

public Channel getChannel();

29

30

/**

31

* Check if the client connection is active

32

* @return true if the connection is active, false otherwise

33

*/

34

public boolean isActive();

35

36

/**

37

* Get the remote socket address of the connected server

38

* @return SocketAddress of the remote server

39

*/

40

public SocketAddress getSocketAddress();

41

42

/**

43

* Get the client identifier

44

* @return String representing the client ID, or null if not set

45

*/

46

public String getClientId();

47

48

/**

49

* Set the client identifier

50

* @param id - String identifier for this client

51

*/

52

public void setClientId(String id);

53

```

54

55

### Chunk Fetching

56

57

Asynchronous chunk fetching functionality for retrieving data blocks from streams.

58

59

```java { .api }

60

/**

61

* Fetch a specific chunk from a stream asynchronously

62

* @param streamId - Identifier of the stream containing the chunk

63

* @param chunkIndex - Index of the chunk to fetch within the stream

64

* @param callback - Callback to handle successful chunk reception or failures

65

*/

66

public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);

67

```

68

69

### Streaming Operations

70

71

Methods for handling streaming data transfer with support for bidirectional communication.

72

73

```java { .api }

74

/**

75

* Request to receive data from a named stream

76

* @param streamId - Identifier of the stream to receive data from

77

* @param callback - Callback to handle streaming data events

78

*/

79

public void stream(String streamId, StreamCallback callback);

80

81

/**

82

* Upload a stream of data to the server with metadata

83

* @param meta - Metadata buffer describing the stream contents

84

* @param data - Data buffer containing the stream data

85

* @param callback - Callback to handle the upload response

86

* @return long request ID for tracking the upload operation

87

*/

88

public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);

89

```

90

91

### RPC Communication

92

93

Remote procedure call functionality with both synchronous and asynchronous operation modes.

94

95

```java { .api }

96

/**

97

* Send an RPC message asynchronously

98

* @param message - ByteBuffer containing the RPC message data

99

* @param callback - Callback to handle the RPC response or failure

100

* @return long request ID for tracking the RPC call

101

*/

102

public long sendRpc(ByteBuffer message, RpcResponseCallback callback);

103

104

/**

105

* Send an RPC message synchronously with timeout

106

* @param message - ByteBuffer containing the RPC message data

107

* @param timeoutMs - Timeout in milliseconds for the RPC call

108

* @return ByteBuffer containing the response data

109

* @throws IOException if the RPC call fails or times out

110

*/

111

public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);

112

113

/**

114

* Send a one-way message (no response expected)

115

* @param message - ByteBuffer containing the message data

116

*/

117

public void send(ByteBuffer message);

118

```

119

120

### Merged Block Operations

121

122

Specialized operations for requesting merged block metadata, used in Spark's shuffle optimization.

123

124

```java { .api }

125

/**

126

* Request merged block metadata for shuffle operations

127

* @param appId - Application identifier

128

* @param shuffleId - Shuffle operation identifier

129

* @param shuffleMergeId - Merge operation identifier

130

* @param reduceId - Reducer task identifier

131

* @param callback - Callback to handle the metadata response

132

*/

133

public void sendMergedBlockMetaReq(String appId, int shuffleId, int shuffleMergeId, int reduceId, MergedBlockMetaResponseCallback callback);

134

```

135

136

### Request Management

137

138

Methods for managing active requests and handling timeouts.

139

140

```java { .api }

141

/**

142

* Remove a pending RPC request by its ID

143

* @param requestId - ID of the request to remove

144

*/

145

public void removeRpcRequest(long requestId);

146

147

/**

148

* Mark this client as timed out, triggering cleanup of pending requests

149

*/

150

public void timeOut();

151

```

152

153

### Resource Management

154

155

Proper cleanup and resource management for client connections.

156

157

```java { .api }

158

/**

159

* Close the client connection and clean up all resources

160

* This will cancel all pending requests and close the underlying channel

161

*/

162

public void close();

163

```

164

165

## Callback Interfaces

166

167

### RpcResponseCallback

168

169

Callback interface for handling RPC responses.

170

171

```java { .api }

172

public interface RpcResponseCallback extends BaseResponseCallback {

173

/**

174

* Called when an RPC call completes successfully

175

* @param response - ByteBuffer containing the response data

176

*/

177

void onSuccess(ByteBuffer response);

178

179

/**

180

* Called when an RPC call fails

181

* @param e - Throwable representing the failure cause

182

*/

183

void onFailure(Throwable e);

184

}

185

```

186

187

### ChunkReceivedCallback

188

189

Callback interface for handling chunk fetch operations.

190

191

```java { .api }

192

public interface ChunkReceivedCallback {

193

/**

194

* Called when a chunk is successfully received

195

* @param chunkIndex - Index of the received chunk

196

* @param buffer - ManagedBuffer containing the chunk data

197

*/

198

void onSuccess(int chunkIndex, ManagedBuffer buffer);

199

200

/**

201

* Called when chunk fetching fails

202

* @param chunkIndex - Index of the chunk that failed to be received

203

* @param e - Throwable representing the failure cause

204

*/

205

void onFailure(int chunkIndex, Throwable e);

206

}

207

```

208

209

### StreamCallback

210

211

Callback interface for handling streaming data operations.

212

213

```java { .api }

214

public interface StreamCallback {

215

/**

216

* Called when data is received from the stream

217

* @param streamId - Identifier of the stream

218

* @param buf - ByteBuffer containing the received data

219

* @throws IOException if data processing fails

220

*/

221

void onData(String streamId, ByteBuffer buf) throws IOException;

222

223

/**

224

* Called when the stream is completed successfully

225

* @param streamId - Identifier of the completed stream

226

* @throws IOException if completion processing fails

227

*/

228

void onComplete(String streamId) throws IOException;

229

230

/**

231

* Called when the stream encounters a failure

232

* @param streamId - Identifier of the failed stream

233

* @param cause - Throwable representing the failure cause

234

* @throws IOException if failure processing fails

235

*/

236

void onFailure(String streamId, Throwable cause) throws IOException;

237

}

238

```

239

240

### StreamCallbackWithID

241

242

Extended stream callback interface that includes an identifier.

243

244

```java { .api }

245

public interface StreamCallbackWithID extends StreamCallback {

246

/**

247

* Get the identifier for this stream callback

248

* @return String identifier for the callback

249

*/

250

String getID();

251

}

252

```

253

254

### MergedBlockMetaResponseCallback

255

256

Callback interface for handling merged block metadata responses.

257

258

```java { .api }

259

public interface MergedBlockMetaResponseCallback extends BaseResponseCallback {

260

/**

261

* Called when merged block metadata is successfully received

262

* @param mergedBlockMeta - MergedBlockMetaSuccess containing the metadata

263

*/

264

void onSuccess(MergedBlockMetaSuccess mergedBlockMeta);

265

266

/**

267

* Called when merged block metadata request fails

268

* @param e - Throwable representing the failure cause

269

*/

270

void onFailure(Throwable e);

271

}

272

```

273

274

## Client Factory

275

276

### TransportClientFactory

277

278

Factory class for creating and managing transport clients with connection pooling and lifecycle management.

279

280

```java { .api }

281

public class TransportClientFactory implements Closeable {

282

/**

283

* Create a transport client connected to the specified host and port

284

* @param remoteHost - Hostname or IP address of the remote server

285

* @param remotePort - Port number of the remote server

286

* @return TransportClient connected to the specified endpoint

287

* @throws IOException if connection establishment fails

288

*/

289

public TransportClient createClient(String remoteHost, int remotePort) throws IOException;

290

291

/**

292

* Create a transport client with a specific client ID

293

* @param remoteHost - Hostname or IP address of the remote server

294

* @param remotePort - Port number of the remote server

295

* @param clientId - Identifier for the client

296

* @return TransportClient connected to the specified endpoint

297

* @throws IOException if connection establishment fails

298

*/

299

public TransportClient createClient(String remoteHost, int remotePort, int clientId) throws IOException;

300

301

/**

302

* Close the factory and all associated client connections

303

*/

304

public void close();

305

}

306

```

307

308

## Usage Examples

309

310

### Basic RPC Communication

311

312

```java

313

import org.apache.spark.network.client.TransportClient;

314

import org.apache.spark.network.client.RpcResponseCallback;

315

316

// Create client through factory

317

TransportClient client = clientFactory.createClient("localhost", 9999);

318

319

// Send asynchronous RPC

320

ByteBuffer request = ByteBuffer.wrap("Hello, Server!".getBytes());

321

client.sendRpc(request, new RpcResponseCallback() {

322

@Override

323

public void onSuccess(ByteBuffer response) {

324

String responseStr = new String(response.array());

325

System.out.println("Server responded: " + responseStr);

326

}

327

328

@Override

329

public void onFailure(Throwable e) {

330

System.err.println("RPC failed: " + e.getMessage());

331

}

332

});

333

334

// Send synchronous RPC with timeout

335

try {

336

ByteBuffer syncResponse = client.sendRpcSync(request, 30000); // 30 second timeout

337

System.out.println("Sync response: " + new String(syncResponse.array()));

338

} catch (IOException e) {

339

System.err.println("Sync RPC failed: " + e.getMessage());

340

}

341

```

342

343

### Chunk Fetching

344

345

```java

346

import org.apache.spark.network.client.ChunkReceivedCallback;

347

import org.apache.spark.network.buffer.ManagedBuffer;

348

349

// Fetch chunks from a stream

350

long streamId = 12345L;

351

client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {

352

@Override

353

public void onSuccess(int chunkIndex, ManagedBuffer buffer) {

354

System.out.println("Received chunk " + chunkIndex + " with " + buffer.size() + " bytes");

355

try {

356

// Process chunk data

357

ByteBuffer data = buffer.nioByteBuffer();

358

// ... process data ...

359

} catch (IOException e) {

360

System.err.println("Failed to process chunk: " + e.getMessage());

361

} finally {

362

buffer.release(); // Important: release buffer when done

363

}

364

}

365

366

@Override

367

public void onFailure(int chunkIndex, Throwable e) {

368

System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());

369

}

370

});

371

```

372

373

### Streaming Data

374

375

```java

376

import org.apache.spark.network.client.StreamCallback;

377

378

// Receive streaming data

379

client.stream("data-stream-1", new StreamCallback() {

380

@Override

381

public void onData(String streamId, ByteBuffer buf) throws IOException {

382

System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);

383

// Process streaming data

384

byte[] data = new byte[buf.remaining()];

385

buf.get(data);

386

// ... process data ...

387

}

388

389

@Override

390

public void onComplete(String streamId) throws IOException {

391

System.out.println("Stream " + streamId + " completed successfully");

392

}

393

394

@Override

395

public void onFailure(String streamId, Throwable cause) throws IOException {

396

System.err.println("Stream " + streamId + " failed: " + cause.getMessage());

397

}

398

});

399

```

400

401

### Connection Management

402

403

```java

404

// Check connection status

405

if (client.isActive()) {

406

System.out.println("Client connected to: " + client.getSocketAddress());

407

System.out.println("Client ID: " + client.getClientId());

408

409

// Set custom client ID

410

client.setClientId("spark-client-" + System.currentTimeMillis());

411

412

// Perform operations...

413

} else {

414

System.out.println("Client connection is not active");

415

}

416

417

// Proper cleanup

418

client.close();

419

```

420

421

## Exception Handling

422

423

### ChunkFetchFailureException

424

425

Exception thrown when chunk fetching operations fail.

426

427

```java { .api }

428

public class ChunkFetchFailureException extends RuntimeException {

429

/**

430

* Create exception with error message and cause

431

* @param errorMsg - Description of the error

432

* @param cause - Underlying cause of the failure

433

*/

434

public ChunkFetchFailureException(String errorMsg, Throwable cause);

435

436

/**

437

* Create exception with error message only

438

* @param errorMsg - Description of the error

439

*/

440

public ChunkFetchFailureException(String errorMsg);

441

}

442

```

443

444

## Bootstrap Integration

445

446

### TransportClientBootstrap

447

448

Interface for customizing client initialization, commonly used for authentication and encryption setup.

449

450

```java { .api }

451

public interface TransportClientBootstrap {

452

/**

453

* Perform bootstrap operations on a newly created client

454

* @param client - TransportClient instance to bootstrap

455

* @param channel - Underlying Netty channel

456

* @throws RuntimeException if bootstrap operations fail

457

*/

458

void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;

459

}

460

```