or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdbuffers.mdconfiguration.mdindex.mdprotocol.mdstreaming.mdtransport.md

streaming.mddocs/

0

# Stream Management

1

2

Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead and zero-copy I/O optimizations.

3

4

## Capabilities

5

6

### Stream Manager Abstract Base

7

8

Core abstraction for managing streams that can be fetched by clients, providing lifecycle management and authorization controls.

9

10

```java { .api }

11

/**

12

* Abstract base class for managing streams that can be fetched by clients

13

* Provides stream lifecycle management and authorization

14

*/

15

public abstract class StreamManager {

16

/**

17

* Get a specific chunk from a stream by ID and index

18

* @param streamId Numeric identifier for the stream

19

* @param chunkIndex Index of the chunk within the stream (0-based)

20

* @return ManagedBuffer containing the chunk data

21

* @throws IllegalArgumentException if stream or chunk doesn't exist

22

*/

23

public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);

24

25

/**

26

* Open a stream for reading by string identifier

27

* @param streamId String identifier for the stream

28

* @return ManagedBuffer containing the full stream data

29

* @throws IllegalArgumentException if stream doesn't exist

30

*/

31

public abstract ManagedBuffer openStream(String streamId);

32

33

/**

34

* Called when a connection terminates to clean up associated streams

35

* @param channel Netty channel that terminated

36

*/

37

public void connectionTerminated(Channel channel);

38

39

/**

40

* Check if client is authorized to access the specified stream

41

* @param client Transport client requesting access

42

* @param streamId Numeric stream identifier

43

* @throws SecurityException if client is not authorized

44

*/

45

public void checkAuthorization(TransportClient client, long streamId);

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

// Implementing a custom StreamManager

53

public class MyStreamManager extends StreamManager {

54

private final Map<Long, List<ManagedBuffer>> streams = new ConcurrentHashMap<>();

55

private final Map<String, ManagedBuffer> namedStreams = new ConcurrentHashMap<>();

56

57

@Override

58

public ManagedBuffer getChunk(long streamId, int chunkIndex) {

59

List<ManagedBuffer> chunks = streams.get(streamId);

60

if (chunks == null || chunkIndex >= chunks.size()) {

61

throw new IllegalArgumentException("Invalid stream or chunk: " +

62

streamId + "/" + chunkIndex);

63

}

64

return chunks.get(chunkIndex).retain();

65

}

66

67

@Override

68

public ManagedBuffer openStream(String streamId) {

69

ManagedBuffer buffer = namedStreams.get(streamId);

70

if (buffer == null) {

71

throw new IllegalArgumentException("Stream not found: " + streamId);

72

}

73

return buffer.retain();

74

}

75

76

@Override

77

public void checkAuthorization(TransportClient client, long streamId) {

78

String clientId = client.getClientId();

79

if (!isAuthorized(clientId, streamId)) {

80

throw new SecurityException("Client " + clientId +

81

" not authorized for stream " + streamId);

82

}

83

}

84

85

private boolean isAuthorized(String clientId, long streamId) {

86

// Custom authorization logic

87

return true;

88

}

89

}

90

```

91

92

### One-For-One Stream Manager

93

94

Concrete StreamManager implementation where each chunk corresponds to one buffer, providing simple stream registration and management.

95

96

```java { .api }

97

/**

98

* StreamManager where each chunk corresponds to one buffer

99

* Provides simple stream registration with automatic cleanup

100

*/

101

public class OneForOneStreamManager extends StreamManager {

102

/**

103

* Create a new OneForOneStreamManager with default settings

104

*/

105

public OneForOneStreamManager();

106

107

/**

108

* Register a new stream with the manager

109

* @param appId Application identifier for authorization

110

* @param buffers Iterator of ManagedBuffer instances representing chunks

111

* @param channel Netty channel associated with this stream (for cleanup)

112

* @return Numeric stream ID that can be used to fetch chunks

113

*/

114

public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);

115

116

/**

117

* Get a specific chunk from a registered stream

118

* @param streamId Stream identifier returned from registerStream

119

* @param chunkIndex Index of the chunk to retrieve (0-based)

120

* @return ManagedBuffer containing the chunk data

121

* @throws IllegalArgumentException if stream or chunk doesn't exist

122

*/

123

public ManagedBuffer getChunk(long streamId, int chunkIndex);

124

125

/**

126

* Open a stream by string identifier (not supported in OneForOneStreamManager)

127

* @param streamId String stream identifier

128

* @return Never returns normally

129

* @throws UnsupportedOperationException always thrown

130

*/

131

public ManagedBuffer openStream(String streamId);

132

133

/**

134

* Clean up streams associated with terminated connection

135

* @param channel Netty channel that terminated

136

*/

137

public void connectionTerminated(Channel channel);

138

}

139

```

140

141

**Usage Examples:**

142

143

```java

144

import java.util.Arrays;

145

import java.util.Iterator;

146

147

// Create stream manager

148

OneForOneStreamManager streamManager = new OneForOneStreamManager();

149

150

// Prepare data chunks

151

List<ManagedBuffer> chunks = Arrays.asList(

152

new NioManagedBuffer(ByteBuffer.wrap("chunk0".getBytes())),

153

new NioManagedBuffer(ByteBuffer.wrap("chunk1".getBytes())),

154

new NioManagedBuffer(ByteBuffer.wrap("chunk2".getBytes()))

155

);

156

157

// Register stream

158

long streamId = streamManager.registerStream("myapp", chunks.iterator(), channel);

159

System.out.println("Registered stream with ID: " + streamId);

160

161

// Clients can now fetch chunks

162

ManagedBuffer chunk0 = streamManager.getChunk(streamId, 0);

163

ManagedBuffer chunk1 = streamManager.getChunk(streamId, 1);

164

165

// File-based chunks for large data

166

List<ManagedBuffer> fileChunks = Arrays.asList(

167

new FileSegmentManagedBuffer(conf, file, 0, 1024*1024), // First 1MB

168

new FileSegmentManagedBuffer(conf, file, 1024*1024, 1024*1024), // Second 1MB

169

new FileSegmentManagedBuffer(conf, file, 2*1024*1024, 512*1024) // Last 512KB

170

);

171

172

long fileStreamId = streamManager.registerStream("fileapp", fileChunks.iterator(), channel);

173

```

174

175

### Stream Callback Interfaces

176

177

Callback interfaces for handling streaming data reception with different levels of functionality.

178

179

```java { .api }

180

/**

181

* Callback interface for handling streaming data reception

182

*/

183

public interface StreamCallback {

184

/**

185

* Called when data is received for a stream

186

* @param streamId String identifier of the stream

187

* @param buf ByteBuffer containing the received data

188

* @throws IOException if data processing fails

189

*/

190

void onData(String streamId, ByteBuffer buf) throws IOException;

191

192

/**

193

* Called when stream transfer is complete

194

* @param streamId String identifier of the stream

195

* @throws IOException if completion processing fails

196

*/

197

void onComplete(String streamId) throws IOException;

198

199

/**

200

* Called when stream transfer fails

201

* @param streamId String identifier of the stream

202

* @param cause Exception that caused the failure

203

* @throws IOException if failure processing fails

204

*/

205

void onFailure(String streamId, Throwable cause) throws IOException;

206

}

207

208

/**

209

* Extended StreamCallback that provides access to stream ID

210

* Useful when the same callback handles multiple streams

211

*/

212

public interface StreamCallbackWithID extends StreamCallback {

213

/**

214

* Get the stream identifier this callback is associated with

215

* @return Stream ID string

216

*/

217

String getID();

218

}

219

```

220

221

**Usage Examples:**

222

223

```java

224

// Simple stream callback

225

StreamCallback callback = new StreamCallback() {

226

private ByteArrayOutputStream buffer = new ByteArrayOutputStream();

227

228

@Override

229

public void onData(String streamId, ByteBuffer buf) throws IOException {

230

byte[] data = new byte[buf.remaining()];

231

buf.get(data);

232

buffer.write(data);

233

System.out.println("Received " + data.length + " bytes for stream " + streamId);

234

}

235

236

@Override

237

public void onComplete(String streamId) throws IOException {

238

byte[] fullData = buffer.toByteArray();

239

System.out.println("Stream " + streamId + " complete, total: " + fullData.length + " bytes");

240

processStreamData(fullData);

241

}

242

243

@Override

244

public void onFailure(String streamId, Throwable cause) throws IOException {

245

System.err.println("Stream " + streamId + " failed: " + cause.getMessage());

246

cleanup();

247

}

248

};

249

250

// Use callback with client

251

client.stream("my-data-stream", callback);

252

253

// Callback with ID for multi-stream handling

254

public class MultiStreamCallback implements StreamCallbackWithID {

255

private final String streamId;

256

private final Map<String, ByteArrayOutputStream> buffers = new ConcurrentHashMap<>();

257

258

public MultiStreamCallback(String streamId) {

259

this.streamId = streamId;

260

buffers.put(streamId, new ByteArrayOutputStream());

261

}

262

263

@Override

264

public String getID() {

265

return streamId;

266

}

267

268

@Override

269

public void onData(String streamId, ByteBuffer buf) throws IOException {

270

ByteArrayOutputStream buffer = buffers.get(streamId);

271

if (buffer != null) {

272

byte[] data = new byte[buf.remaining()];

273

buf.get(data);

274

buffer.write(data);

275

}

276

}

277

278

@Override

279

public void onComplete(String streamId) throws IOException {

280

ByteArrayOutputStream buffer = buffers.remove(streamId);

281

if (buffer != null) {

282

processCompletedStream(streamId, buffer.toByteArray());

283

}

284

}

285

286

@Override

287

public void onFailure(String streamId, Throwable cause) throws IOException {

288

buffers.remove(streamId);

289

handleStreamFailure(streamId, cause);

290

}

291

}

292

```

293

294

### Chunk Reception Callback

295

296

Callback interface specifically for handling chunk-based data reception with precise chunk indexing.

297

298

```java { .api }

299

/**

300

* Callback interface for handling received chunks from stream fetching

301

*/

302

public interface ChunkReceivedCallback {

303

/**

304

* Called when a chunk is successfully received

305

* @param chunkIndex Index of the received chunk (0-based)

306

* @param buffer ManagedBuffer containing the chunk data

307

*/

308

void onSuccess(int chunkIndex, ManagedBuffer buffer);

309

310

/**

311

* Called when chunk fetching fails

312

* @param chunkIndex Index of the chunk that failed to fetch

313

* @param e Exception that caused the failure

314

*/

315

void onFailure(int chunkIndex, Throwable e);

316

}

317

```

318

319

**Usage Examples:**

320

321

```java

322

// Chunk fetching with callback

323

ChunkReceivedCallback chunkCallback = new ChunkReceivedCallback() {

324

private final Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();

325

private final int totalChunks;

326

327

public ChunkReceivedCallback(int totalChunks) {

328

this.totalChunks = totalChunks;

329

}

330

331

@Override

332

public void onSuccess(int chunkIndex, ManagedBuffer buffer) {

333

System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());

334

receivedChunks.put(chunkIndex, buffer.retain());

335

336

// Check if all chunks received

337

if (receivedChunks.size() == totalChunks) {

338

assembleCompleteData();

339

}

340

}

341

342

@Override

343

public void onFailure(int chunkIndex, Throwable e) {

344

System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());

345

// Retry or handle failure

346

retryChunk(chunkIndex);

347

}

348

349

private void assembleCompleteData() {

350

// Assemble chunks in order

351

ByteArrayOutputStream result = new ByteArrayOutputStream();

352

for (int i = 0; i < totalChunks; i++) {

353

ManagedBuffer chunk = receivedChunks.get(i);

354

if (chunk != null) {

355

try (InputStream is = chunk.createInputStream()) {

356

byte[] data = new byte[(int) chunk.size()];

357

is.read(data);

358

result.write(data);

359

} catch (IOException e) {

360

System.err.println("Error reading chunk " + i + ": " + e.getMessage());

361

} finally {

362

chunk.release();

363

}

364

}

365

}

366

367

processAssembledData(result.toByteArray());

368

}

369

};

370

371

// Fetch chunks

372

for (int i = 0; i < totalChunks; i++) {

373

client.fetchChunk(streamId, i, chunkCallback);

374

}

375

```

376

377

## Stream Usage Patterns

378

379

### Large File Transfer

380

381

```java

382

// Server-side: Register file as stream

383

File largeFile = new File("/path/to/large/file.dat");

384

long fileSize = largeFile.length();

385

int chunkSize = 1024 * 1024; // 1MB chunks

386

List<ManagedBuffer> chunks = new ArrayList<>();

387

388

for (long offset = 0; offset < fileSize; offset += chunkSize) {

389

long length = Math.min(chunkSize, fileSize - offset);

390

chunks.add(new FileSegmentManagedBuffer(conf, largeFile, offset, length));

391

}

392

393

OneForOneStreamManager streamManager = new OneForOneStreamManager();

394

long streamId = streamManager.registerStream("file-transfer", chunks.iterator(), channel);

395

396

// Client-side: Fetch file in chunks

397

int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);

398

Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();

399

400

ChunkReceivedCallback callback = new ChunkReceivedCallback() {

401

@Override

402

public void onSuccess(int chunkIndex, ManagedBuffer buffer) {

403

receivedChunks.put(chunkIndex, buffer);

404

if (receivedChunks.size() == totalChunks) {

405

reconstructFile();

406

}

407

}

408

409

@Override

410

public void onFailure(int chunkIndex, Throwable e) {

411

System.err.println("Chunk " + chunkIndex + " failed: " + e.getMessage());

412

}

413

};

414

415

// Fetch all chunks

416

for (int i = 0; i < totalChunks; i++) {

417

client.fetchChunk(streamId, i, callback);

418

}

419

```

420

421

### Streaming Data Processing

422

423

```java

424

// Server-side: Stream processing results

425

public class ProcessingStreamManager extends StreamManager {

426

private final Map<String, DataProcessor> processors = new ConcurrentHashMap<>();

427

428

public void startProcessing(String streamId, DataProcessor processor) {

429

processors.put(streamId, processor);

430

}

431

432

@Override

433

public ManagedBuffer openStream(String streamId) {

434

DataProcessor processor = processors.get(streamId);

435

if (processor == null) {

436

throw new IllegalArgumentException("No processor for stream: " + streamId);

437

}

438

439

// Return processed data as stream

440

byte[] processedData = processor.getResults();

441

return new NioManagedBuffer(ByteBuffer.wrap(processedData));

442

}

443

444

@Override

445

public ManagedBuffer getChunk(long streamId, int chunkIndex) {

446

throw new UnsupportedOperationException("Use openStream for processed data");

447

}

448

}

449

450

// Client-side: Receive processed data

451

StreamCallback streamCallback = new StreamCallback() {

452

private final ByteArrayOutputStream results = new ByteArrayOutputStream();

453

454

@Override

455

public void onData(String streamId, ByteBuffer buf) throws IOException {

456

byte[] data = new byte[buf.remaining()];

457

buf.get(data);

458

results.write(data);

459

}

460

461

@Override

462

public void onComplete(String streamId) throws IOException {

463

byte[] finalResults = results.toByteArray();

464

handleProcessingResults(finalResults);

465

}

466

467

@Override

468

public void onFailure(String streamId, Throwable cause) throws IOException {

469

System.err.println("Processing stream failed: " + cause.getMessage());

470

}

471

};

472

473

client.stream("processing-results", streamCallback);

474

```

475

476

### Zero-Copy Stream Transfer

477

478

```java

479

// Efficient zero-copy transfer using FileSegmentManagedBuffer

480

public class ZeroCopyStreamManager extends StreamManager {

481

private final Map<Long, FileInfo> streamFiles = new ConcurrentHashMap<>();

482

483

public long registerFileStream(File file, Channel channel) {

484

long streamId = generateStreamId();

485

streamFiles.put(streamId, new FileInfo(file, channel));

486

return streamId;

487

}

488

489

@Override

490

public ManagedBuffer getChunk(long streamId, int chunkIndex) {

491

FileInfo fileInfo = streamFiles.get(streamId);

492

if (fileInfo == null) {

493

throw new IllegalArgumentException("Stream not found: " + streamId);

494

}

495

496

long chunkSize = 64 * 1024; // 64KB chunks

497

long offset = chunkIndex * chunkSize;

498

long length = Math.min(chunkSize, fileInfo.file.length() - offset);

499

500

if (offset >= fileInfo.file.length()) {

501

throw new IllegalArgumentException("Chunk index out of range: " + chunkIndex);

502

}

503

504

// Zero-copy file segment

505

return new FileSegmentManagedBuffer(conf, fileInfo.file, offset, length);

506

}

507

508

@Override

509

public ManagedBuffer openStream(String streamId) {

510

throw new UnsupportedOperationException("Use getChunk for file streams");

511

}

512

513

private static class FileInfo {

514

final File file;

515

final Channel channel;

516

517

FileInfo(File file, Channel channel) {

518

this.file = file;

519

this.channel = channel;

520

}

521

}

522

}

523

```