0
# Stream Management
1
2
Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead and zero-copy I/O optimizations.
3
4
## Capabilities
5
6
### Stream Manager Abstract Base
7
8
Core abstraction for managing streams that can be fetched by clients, providing lifecycle management and authorization controls.
9
10
```java { .api }
11
/**
12
* Abstract base class for managing streams that can be fetched by clients
13
* Provides stream lifecycle management and authorization
14
*/
15
public abstract class StreamManager {
16
/**
17
* Get a specific chunk from a stream by ID and index
18
* @param streamId Numeric identifier for the stream
19
* @param chunkIndex Index of the chunk within the stream (0-based)
20
* @return ManagedBuffer containing the chunk data
21
* @throws IllegalArgumentException if stream or chunk doesn't exist
22
*/
23
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
24
25
/**
26
* Open a stream for reading by string identifier
27
* @param streamId String identifier for the stream
28
* @return ManagedBuffer containing the full stream data
29
* @throws IllegalArgumentException if stream doesn't exist
30
*/
31
public abstract ManagedBuffer openStream(String streamId);
32
33
/**
34
* Called when a connection terminates to clean up associated streams
35
* @param channel Netty channel that terminated
36
*/
37
public void connectionTerminated(Channel channel);
38
39
/**
40
* Check if client is authorized to access the specified stream
41
* @param client Transport client requesting access
42
* @param streamId Numeric stream identifier
43
* @throws SecurityException if client is not authorized
44
*/
45
public void checkAuthorization(TransportClient client, long streamId);
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
// Implementing a custom StreamManager
53
public class MyStreamManager extends StreamManager {
54
private final Map<Long, List<ManagedBuffer>> streams = new ConcurrentHashMap<>();
55
private final Map<String, ManagedBuffer> namedStreams = new ConcurrentHashMap<>();
56
57
@Override
58
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
59
List<ManagedBuffer> chunks = streams.get(streamId);
60
if (chunks == null || chunkIndex >= chunks.size()) {
61
throw new IllegalArgumentException("Invalid stream or chunk: " +
62
streamId + "/" + chunkIndex);
63
}
64
return chunks.get(chunkIndex).retain();
65
}
66
67
@Override
68
public ManagedBuffer openStream(String streamId) {
69
ManagedBuffer buffer = namedStreams.get(streamId);
70
if (buffer == null) {
71
throw new IllegalArgumentException("Stream not found: " + streamId);
72
}
73
return buffer.retain();
74
}
75
76
@Override
77
public void checkAuthorization(TransportClient client, long streamId) {
78
String clientId = client.getClientId();
79
if (!isAuthorized(clientId, streamId)) {
80
throw new SecurityException("Client " + clientId +
81
" not authorized for stream " + streamId);
82
}
83
}
84
85
private boolean isAuthorized(String clientId, long streamId) {
86
// Custom authorization logic
87
return true;
88
}
89
}
90
```
91
92
### One-For-One Stream Manager
93
94
Concrete StreamManager implementation where each chunk corresponds to one buffer, providing simple stream registration and management.
95
96
```java { .api }
97
/**
98
* StreamManager where each chunk corresponds to one buffer
99
* Provides simple stream registration with automatic cleanup
100
*/
101
public class OneForOneStreamManager extends StreamManager {
102
/**
103
* Create a new OneForOneStreamManager with default settings
104
*/
105
public OneForOneStreamManager();
106
107
/**
108
* Register a new stream with the manager
109
* @param appId Application identifier for authorization
110
* @param buffers Iterator of ManagedBuffer instances representing chunks
111
* @param channel Netty channel associated with this stream (for cleanup)
112
* @return Numeric stream ID that can be used to fetch chunks
113
*/
114
public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);
115
116
/**
117
* Get a specific chunk from a registered stream
118
* @param streamId Stream identifier returned from registerStream
119
* @param chunkIndex Index of the chunk to retrieve (0-based)
120
* @return ManagedBuffer containing the chunk data
121
* @throws IllegalArgumentException if stream or chunk doesn't exist
122
*/
123
public ManagedBuffer getChunk(long streamId, int chunkIndex);
124
125
/**
126
* Open a stream by string identifier (not supported in OneForOneStreamManager)
127
* @param streamId String stream identifier
128
* @return Never returns normally
129
* @throws UnsupportedOperationException always thrown
130
*/
131
public ManagedBuffer openStream(String streamId);
132
133
/**
134
* Clean up streams associated with terminated connection
135
* @param channel Netty channel that terminated
136
*/
137
public void connectionTerminated(Channel channel);
138
}
139
```
140
141
**Usage Examples:**
142
143
```java
144
import java.util.Arrays;
145
import java.util.Iterator;
146
147
// Create stream manager
148
OneForOneStreamManager streamManager = new OneForOneStreamManager();
149
150
// Prepare data chunks
151
List<ManagedBuffer> chunks = Arrays.asList(
152
new NioManagedBuffer(ByteBuffer.wrap("chunk0".getBytes())),
153
new NioManagedBuffer(ByteBuffer.wrap("chunk1".getBytes())),
154
new NioManagedBuffer(ByteBuffer.wrap("chunk2".getBytes()))
155
);
156
157
// Register stream
158
long streamId = streamManager.registerStream("myapp", chunks.iterator(), channel);
159
System.out.println("Registered stream with ID: " + streamId);
160
161
// Clients can now fetch chunks
162
ManagedBuffer chunk0 = streamManager.getChunk(streamId, 0);
163
ManagedBuffer chunk1 = streamManager.getChunk(streamId, 1);
164
165
// File-based chunks for large data
166
List<ManagedBuffer> fileChunks = Arrays.asList(
167
new FileSegmentManagedBuffer(conf, file, 0, 1024*1024), // First 1MB
168
new FileSegmentManagedBuffer(conf, file, 1024*1024, 1024*1024), // Second 1MB
169
new FileSegmentManagedBuffer(conf, file, 2*1024*1024, 512*1024) // Last 512KB
170
);
171
172
long fileStreamId = streamManager.registerStream("fileapp", fileChunks.iterator(), channel);
173
```
174
175
### Stream Callback Interfaces
176
177
Callback interfaces for handling streaming data reception with different levels of functionality.
178
179
```java { .api }
180
/**
181
* Callback interface for handling streaming data reception
182
*/
183
public interface StreamCallback {
184
/**
185
* Called when data is received for a stream
186
* @param streamId String identifier of the stream
187
* @param buf ByteBuffer containing the received data
188
* @throws IOException if data processing fails
189
*/
190
void onData(String streamId, ByteBuffer buf) throws IOException;
191
192
/**
193
* Called when stream transfer is complete
194
* @param streamId String identifier of the stream
195
* @throws IOException if completion processing fails
196
*/
197
void onComplete(String streamId) throws IOException;
198
199
/**
200
* Called when stream transfer fails
201
* @param streamId String identifier of the stream
202
* @param cause Exception that caused the failure
203
* @throws IOException if failure processing fails
204
*/
205
void onFailure(String streamId, Throwable cause) throws IOException;
206
}
207
208
/**
209
* Extended StreamCallback that provides access to stream ID
210
* Useful when the same callback handles multiple streams
211
*/
212
public interface StreamCallbackWithID extends StreamCallback {
213
/**
214
* Get the stream identifier this callback is associated with
215
* @return Stream ID string
216
*/
217
String getID();
218
}
219
```
220
221
**Usage Examples:**
222
223
```java
224
// Simple stream callback
225
StreamCallback callback = new StreamCallback() {
226
private ByteArrayOutputStream buffer = new ByteArrayOutputStream();
227
228
@Override
229
public void onData(String streamId, ByteBuffer buf) throws IOException {
230
byte[] data = new byte[buf.remaining()];
231
buf.get(data);
232
buffer.write(data);
233
System.out.println("Received " + data.length + " bytes for stream " + streamId);
234
}
235
236
@Override
237
public void onComplete(String streamId) throws IOException {
238
byte[] fullData = buffer.toByteArray();
239
System.out.println("Stream " + streamId + " complete, total: " + fullData.length + " bytes");
240
processStreamData(fullData);
241
}
242
243
@Override
244
public void onFailure(String streamId, Throwable cause) throws IOException {
245
System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
246
cleanup();
247
}
248
};
249
250
// Use callback with client
251
client.stream("my-data-stream", callback);
252
253
// Callback with ID for multi-stream handling
254
public class MultiStreamCallback implements StreamCallbackWithID {
255
private final String streamId;
256
private final Map<String, ByteArrayOutputStream> buffers = new ConcurrentHashMap<>();
257
258
public MultiStreamCallback(String streamId) {
259
this.streamId = streamId;
260
buffers.put(streamId, new ByteArrayOutputStream());
261
}
262
263
@Override
264
public String getID() {
265
return streamId;
266
}
267
268
@Override
269
public void onData(String streamId, ByteBuffer buf) throws IOException {
270
ByteArrayOutputStream buffer = buffers.get(streamId);
271
if (buffer != null) {
272
byte[] data = new byte[buf.remaining()];
273
buf.get(data);
274
buffer.write(data);
275
}
276
}
277
278
@Override
279
public void onComplete(String streamId) throws IOException {
280
ByteArrayOutputStream buffer = buffers.remove(streamId);
281
if (buffer != null) {
282
processCompletedStream(streamId, buffer.toByteArray());
283
}
284
}
285
286
@Override
287
public void onFailure(String streamId, Throwable cause) throws IOException {
288
buffers.remove(streamId);
289
handleStreamFailure(streamId, cause);
290
}
291
}
292
```
293
294
### Chunk Reception Callback
295
296
Callback interface specifically for handling chunk-based data reception with precise chunk indexing.
297
298
```java { .api }
299
/**
300
* Callback interface for handling received chunks from stream fetching
301
*/
302
public interface ChunkReceivedCallback {
303
/**
304
* Called when a chunk is successfully received
305
* @param chunkIndex Index of the received chunk (0-based)
306
* @param buffer ManagedBuffer containing the chunk data
307
*/
308
void onSuccess(int chunkIndex, ManagedBuffer buffer);
309
310
/**
311
* Called when chunk fetching fails
312
* @param chunkIndex Index of the chunk that failed to fetch
313
* @param e Exception that caused the failure
314
*/
315
void onFailure(int chunkIndex, Throwable e);
316
}
317
```
318
319
**Usage Examples:**
320
321
```java
322
// Chunk fetching with callback
323
ChunkReceivedCallback chunkCallback = new ChunkReceivedCallback() {
324
private final Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();
325
private final int totalChunks;
326
327
public ChunkReceivedCallback(int totalChunks) {
328
this.totalChunks = totalChunks;
329
}
330
331
@Override
332
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
333
System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
334
receivedChunks.put(chunkIndex, buffer.retain());
335
336
// Check if all chunks received
337
if (receivedChunks.size() == totalChunks) {
338
assembleCompleteData();
339
}
340
}
341
342
@Override
343
public void onFailure(int chunkIndex, Throwable e) {
344
System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
345
// Retry or handle failure
346
retryChunk(chunkIndex);
347
}
348
349
private void assembleCompleteData() {
350
// Assemble chunks in order
351
ByteArrayOutputStream result = new ByteArrayOutputStream();
352
for (int i = 0; i < totalChunks; i++) {
353
ManagedBuffer chunk = receivedChunks.get(i);
354
if (chunk != null) {
355
try (InputStream is = chunk.createInputStream()) {
356
byte[] data = new byte[(int) chunk.size()];
357
is.read(data);
358
result.write(data);
359
} catch (IOException e) {
360
System.err.println("Error reading chunk " + i + ": " + e.getMessage());
361
} finally {
362
chunk.release();
363
}
364
}
365
}
366
367
processAssembledData(result.toByteArray());
368
}
369
};
370
371
// Fetch chunks
372
for (int i = 0; i < totalChunks; i++) {
373
client.fetchChunk(streamId, i, chunkCallback);
374
}
375
```
376
377
## Stream Usage Patterns
378
379
### Large File Transfer
380
381
```java
382
// Server-side: Register file as stream
383
File largeFile = new File("/path/to/large/file.dat");
384
long fileSize = largeFile.length();
385
int chunkSize = 1024 * 1024; // 1MB chunks
386
List<ManagedBuffer> chunks = new ArrayList<>();
387
388
for (long offset = 0; offset < fileSize; offset += chunkSize) {
389
long length = Math.min(chunkSize, fileSize - offset);
390
chunks.add(new FileSegmentManagedBuffer(conf, largeFile, offset, length));
391
}
392
393
OneForOneStreamManager streamManager = new OneForOneStreamManager();
394
long streamId = streamManager.registerStream("file-transfer", chunks.iterator(), channel);
395
396
// Client-side: Fetch file in chunks
397
int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);
398
Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();
399
400
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
401
@Override
402
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
403
receivedChunks.put(chunkIndex, buffer);
404
if (receivedChunks.size() == totalChunks) {
405
reconstructFile();
406
}
407
}
408
409
@Override
410
public void onFailure(int chunkIndex, Throwable e) {
411
System.err.println("Chunk " + chunkIndex + " failed: " + e.getMessage());
412
}
413
};
414
415
// Fetch all chunks
416
for (int i = 0; i < totalChunks; i++) {
417
client.fetchChunk(streamId, i, callback);
418
}
419
```
420
421
### Streaming Data Processing
422
423
```java
424
// Server-side: Stream processing results
425
public class ProcessingStreamManager extends StreamManager {
426
private final Map<String, DataProcessor> processors = new ConcurrentHashMap<>();
427
428
public void startProcessing(String streamId, DataProcessor processor) {
429
processors.put(streamId, processor);
430
}
431
432
@Override
433
public ManagedBuffer openStream(String streamId) {
434
DataProcessor processor = processors.get(streamId);
435
if (processor == null) {
436
throw new IllegalArgumentException("No processor for stream: " + streamId);
437
}
438
439
// Return processed data as stream
440
byte[] processedData = processor.getResults();
441
return new NioManagedBuffer(ByteBuffer.wrap(processedData));
442
}
443
444
@Override
445
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
446
throw new UnsupportedOperationException("Use openStream for processed data");
447
}
448
}
449
450
// Client-side: Receive processed data
451
StreamCallback streamCallback = new StreamCallback() {
452
private final ByteArrayOutputStream results = new ByteArrayOutputStream();
453
454
@Override
455
public void onData(String streamId, ByteBuffer buf) throws IOException {
456
byte[] data = new byte[buf.remaining()];
457
buf.get(data);
458
results.write(data);
459
}
460
461
@Override
462
public void onComplete(String streamId) throws IOException {
463
byte[] finalResults = results.toByteArray();
464
handleProcessingResults(finalResults);
465
}
466
467
@Override
468
public void onFailure(String streamId, Throwable cause) throws IOException {
469
System.err.println("Processing stream failed: " + cause.getMessage());
470
}
471
};
472
473
client.stream("processing-results", streamCallback);
474
```
475
476
### Zero-Copy Stream Transfer
477
478
```java
479
// Efficient zero-copy transfer using FileSegmentManagedBuffer
480
public class ZeroCopyStreamManager extends StreamManager {
481
private final Map<Long, FileInfo> streamFiles = new ConcurrentHashMap<>();
482
483
public long registerFileStream(File file, Channel channel) {
484
long streamId = generateStreamId();
485
streamFiles.put(streamId, new FileInfo(file, channel));
486
return streamId;
487
}
488
489
@Override
490
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
491
FileInfo fileInfo = streamFiles.get(streamId);
492
if (fileInfo == null) {
493
throw new IllegalArgumentException("Stream not found: " + streamId);
494
}
495
496
long chunkSize = 64 * 1024; // 64KB chunks
497
long offset = chunkIndex * chunkSize;
498
long length = Math.min(chunkSize, fileInfo.file.length() - offset);
499
500
if (offset >= fileInfo.file.length()) {
501
throw new IllegalArgumentException("Chunk index out of range: " + chunkIndex);
502
}
503
504
// Zero-copy file segment
505
return new FileSegmentManagedBuffer(conf, fileInfo.file, offset, length);
506
}
507
508
@Override
509
public ManagedBuffer openStream(String streamId) {
510
throw new UnsupportedOperationException("Use getChunk for file streams");
511
}
512
513
private static class FileInfo {
514
final File file;
515
final Channel channel;
516
517
FileInfo(File file, Channel channel) {
518
this.file = file;
519
this.channel = channel;
520
}
521
}
522
}
523
```