0
# Streaming & Advanced I/O
1
2
This documentation covers advanced I/O operations in the Azure Storage Blob Java SDK, including streaming uploads and downloads, reactive programming patterns, and high-performance data transfer techniques.
3
4
## Input Streams and Downloads
5
6
### BlobInputStream
7
8
Stream blob content directly without loading entire content into memory.
9
10
```java
11
import com.azure.storage.blob.specialized.BlobInputStream;
12
import java.io.InputStream;
13
import java.io.BufferedReader;
14
import java.io.InputStreamReader;
15
16
// Open blob as input stream
17
try (BlobInputStream blobInputStream = blobClient.openInputStream()) {
18
// Process stream incrementally
19
byte[] buffer = new byte[8192];
20
int bytesRead;
21
long totalBytesRead = 0;
22
23
while ((bytesRead = blobInputStream.read(buffer)) != -1) {
24
// Process buffer content
25
processChunk(buffer, bytesRead);
26
totalBytesRead += bytesRead;
27
28
if (totalBytesRead % (1024 * 1024) == 0) { // Log every MB
29
System.out.println("Processed: " + (totalBytesRead / 1024 / 1024) + " MB");
30
}
31
}
32
33
System.out.println("Total bytes processed: " + totalBytesRead);
34
35
} catch (IOException ex) {
36
System.err.println("Stream processing failed: " + ex.getMessage());
37
}
38
39
// Open blob input stream with options
40
BlobInputStreamOptions inputStreamOptions = new BlobInputStreamOptions()
41
.setBlockSize(1024 * 1024) // 1MB read buffer
42
.setRange(new BlobRange(0, 10 * 1024 * 1024L)) // Read first 10MB only
43
.setRequestConditions(new BlobRequestConditions()
44
.setIfMatch(blobClient.getProperties().getETag()));
45
46
try (BlobInputStream configuredStream = blobClient.openInputStream(inputStreamOptions)) {
47
// Process configured stream
48
processConfiguredStream(configuredStream);
49
}
50
51
// Stream text content line by line
52
try (BlobInputStream blobStream = blobClient.openInputStream();
53
BufferedReader reader = new BufferedReader(new InputStreamReader(blobStream, StandardCharsets.UTF_8))) {
54
55
String line;
56
int lineNumber = 0;
57
58
while ((line = reader.readLine()) != null) {
59
lineNumber++;
60
processTextLine(line, lineNumber);
61
62
if (lineNumber % 1000 == 0) { // Log progress every 1000 lines
63
System.out.println("Processed " + lineNumber + " lines");
64
}
65
}
66
67
System.out.println("Total lines processed: " + lineNumber);
68
}
69
```
70
71
### Streaming Downloads with Reactive Programming
72
73
```java
74
import reactor.core.publisher.Flux;
75
import reactor.core.publisher.Mono;
76
import java.nio.ByteBuffer;
77
78
// Async streaming download
79
BlobAsyncClient asyncClient = blobClient.getAsyncClient();
80
81
Flux<ByteBuffer> downloadStream = asyncClient.downloadStream();
82
83
// Process stream reactively
84
downloadStream
85
.doOnNext(buffer -> {
86
int bufferSize = buffer.remaining();
87
System.out.println("Received buffer: " + bufferSize + " bytes");
88
89
// Process buffer content
90
byte[] bytes = new byte[bufferSize];
91
buffer.get(bytes);
92
processAsyncChunk(bytes);
93
})
94
.doOnError(error -> System.err.println("Download stream error: " + error.getMessage()))
95
.doOnComplete(() -> System.out.println("Download stream completed"))
96
.subscribe();
97
98
// Download with backpressure control
99
downloadStream
100
.buffer(10) // Buffer 10 chunks
101
.flatMap(buffers -> {
102
// Process batch of buffers
103
return Mono.fromRunnable(() -> processBatch(buffers));
104
}, 2) // Process max 2 batches concurrently
105
.doOnNext(result -> System.out.println("Batch processed"))
106
.subscribe();
107
108
// Download with retry and error handling
109
downloadStream
110
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
111
.onErrorResume(error -> {
112
System.err.println("Download failed after retries: " + error.getMessage());
113
return Flux.empty();
114
})
115
.subscribe();
116
```
117
118
### Range-Based Streaming
119
120
```java
121
// Stream specific ranges of large blobs
122
public class RangeBasedStreamer {
123
private final BlobClient blobClient;
124
private final long chunkSize;
125
126
public RangeBasedStreamer(BlobClient blobClient, long chunkSize) {
127
this.blobClient = blobClient;
128
this.chunkSize = chunkSize;
129
}
130
131
public void streamBlobInRanges(ProgressCallback callback) {
132
try {
133
BlobProperties properties = blobClient.getProperties();
134
long blobSize = properties.getBlobSize();
135
long totalProcessed = 0;
136
137
System.out.println("Streaming blob in " + chunkSize + " byte chunks");
138
System.out.println("Total blob size: " + blobSize + " bytes");
139
140
for (long offset = 0; offset < blobSize; offset += chunkSize) {
141
long rangeEnd = Math.min(offset + chunkSize - 1, blobSize - 1);
142
BlobRange range = new BlobRange(offset, rangeEnd - offset + 1);
143
144
// Download this range
145
BlobDownloadContentResponse rangeResponse = blobClient.downloadContentWithResponse(
146
new BlobDownloadOptions().setRange(range),
147
Duration.ofMinutes(2),
148
Context.NONE
149
);
150
151
BinaryData rangeData = rangeResponse.getValue();
152
byte[] chunkBytes = rangeData.toBytes();
153
154
// Process chunk
155
processChunkWithRange(chunkBytes, offset, rangeEnd);
156
157
totalProcessed += chunkBytes.length;
158
159
// Report progress
160
if (callback != null) {
161
double progress = (double) totalProcessed / blobSize * 100;
162
callback.onProgress(totalProcessed, blobSize, progress);
163
}
164
}
165
166
System.out.println("Range-based streaming completed");
167
168
} catch (Exception ex) {
169
System.err.println("Range streaming failed: " + ex.getMessage());
170
throw ex;
171
}
172
}
173
174
@FunctionalInterface
175
public interface ProgressCallback {
176
void onProgress(long processed, long total, double percentage);
177
}
178
179
private void processChunkWithRange(byte[] chunk, long startOffset, long endOffset) {
180
System.out.printf("Processing chunk: offset %d-%d (%d bytes)%n",
181
startOffset, endOffset, chunk.length);
182
// Implement chunk processing logic
183
}
184
}
185
186
// Usage
187
RangeBasedStreamer streamer = new RangeBasedStreamer(blobClient, 5 * 1024 * 1024L); // 5MB chunks
188
189
streamer.streamBlobInRanges((processed, total, percentage) -> {
190
System.out.printf("Progress: %.1f%% (%d/%d bytes)%n", percentage, processed, total);
191
});
192
```
193
194
## Output Streams and Uploads
195
196
### BlobOutputStream
197
198
Stream data directly to blobs without buffering entire content.
199
200
```java
201
import com.azure.storage.blob.specialized.BlobOutputStream;
202
203
// Basic output stream usage
204
try (BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream()) {
205
// Write data incrementally
206
for (int i = 0; i < 1000; i++) {
207
String data = "Data chunk " + i + "\n";
208
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
209
210
if (i % 100 == 0) {
211
outputStream.flush(); // Force upload of current blocks
212
System.out.println("Flushed at chunk " + i);
213
}
214
}
215
216
// Stream automatically closes and commits all blocks
217
System.out.println("Stream upload completed");
218
219
} catch (IOException ex) {
220
System.err.println("Stream upload failed: " + ex.getMessage());
221
}
222
223
// Configured output stream with options
224
ParallelTransferOptions streamTransferOptions = new ParallelTransferOptions()
225
.setBlockSizeLong(2 * 1024 * 1024L) // 2MB blocks
226
.setMaxConcurrency(4)
227
.setProgressListener(bytesTransferred ->
228
System.out.println("Stream progress: " + bytesTransferred + " bytes"));
229
230
BlobHttpHeaders streamHeaders = new BlobHttpHeaders()
231
.setContentType("application/octet-stream")
232
.setContentEncoding("gzip")
233
.setCacheControl("private, no-cache");
234
235
Map<String, String> streamMetadata = Map.of(
236
"streaming", "true",
237
"created-time", OffsetDateTime.now().toString(),
238
"source", "streaming-application"
239
);
240
241
try (BlobOutputStream configuredStream = blockBlobClient.getBlobOutputStream(
242
streamTransferOptions,
243
streamHeaders,
244
streamMetadata,
245
AccessTier.HOT,
246
new BlobRequestConditions().setIfNoneMatch("*"),
247
Context.NONE)) {
248
249
// Write large dataset incrementally
250
writeDataSetToStream(configuredStream);
251
252
System.out.println("Configured stream upload completed");
253
}
254
```
255
256
### Reactive Streaming Uploads
257
258
```java
259
// Upload from Flux of ByteBuffer
260
Flux<ByteBuffer> dataFlux = createDataFlux();
261
262
BlobParallelUploadOptions fluxUploadOptions = new BlobParallelUploadOptions(dataFlux)
263
.setParallelTransferOptions(new ParallelTransferOptions()
264
.setBlockSizeLong(1024 * 1024L) // 1MB blocks
265
.setMaxConcurrency(6)
266
.setProgressListener(bytesTransferred ->
267
System.out.println("Reactive upload progress: " + bytesTransferred)))
268
.setHeaders(new BlobHttpHeaders().setContentType("application/octet-stream"))
269
.setMetadata(Map.of("source", "reactive-stream"));
270
271
BlobAsyncClient asyncClient = blobClient.getAsyncClient();
272
273
// Upload reactively
274
Mono<Response<BlockBlobItem>> uploadMono = asyncClient.uploadWithResponse(fluxUploadOptions);
275
276
uploadMono
277
.doOnSuccess(response -> {
278
System.out.println("Reactive upload completed:");
279
System.out.println("ETag: " + response.getValue().getETag());
280
System.out.println("Status: " + response.getStatusCode());
281
})
282
.doOnError(error -> System.err.println("Reactive upload failed: " + error.getMessage()))
283
.subscribe();
284
285
// Create data flux from various sources
286
private Flux<ByteBuffer> createDataFlux() {
287
return Flux.range(1, 1000)
288
.map(i -> {
289
String data = "Reactive data chunk " + i + "\n";
290
return ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8));
291
})
292
.delayElements(Duration.ofMillis(10)); // Simulate streaming data
293
}
294
295
// Upload from file with reactive progress
296
Path filePath = Paths.get("large-file.dat");
297
Flux<ByteBuffer> fileFlux = DataBufferUtils.read(
298
filePath,
299
new DefaultDataBufferFactory(),
300
4096 // 4KB buffers
301
).map(DataBuffer::asByteBuffer);
302
303
BlobParallelUploadOptions fileFluxOptions = new BlobParallelUploadOptions(fileFlux)
304
.setParallelTransferOptions(new ParallelTransferOptions()
305
.setProgressListener(new ReactiveProgressTracker(Files.size(filePath))));
306
307
asyncClient.uploadWithResponse(fileFluxOptions)
308
.doOnSuccess(response -> System.out.println("File stream upload completed"))
309
.subscribe();
310
```
311
312
## Large File Handling
313
314
### Optimized Large File Operations
315
316
```java
317
public class LargeFileManager {
318
private final BlockBlobClient blockBlobClient;
319
private final long optimalBlockSize;
320
private final int maxConcurrency;
321
322
public LargeFileManager(BlockBlobClient blockBlobClient) {
323
this.blockBlobClient = blockBlobClient;
324
325
// Optimize based on file size and available memory
326
long availableMemory = Runtime.getRuntime().maxMemory();
327
this.optimalBlockSize = Math.min(32 * 1024 * 1024L, availableMemory / 16); // Max 32MB or 1/16 of heap
328
this.maxConcurrency = Math.min(16, Runtime.getRuntime().availableProcessors() * 2);
329
330
System.out.printf("Optimized for block size: %d MB, concurrency: %d%n",
331
optimalBlockSize / 1024 / 1024, maxConcurrency);
332
}
333
334
public void uploadLargeFile(Path filePath, LargeFileCallback callback) throws IOException {
335
long fileSize = Files.size(filePath);
336
337
if (fileSize <= 256 * 1024 * 1024L) { // <= 256MB, use simple upload
338
uploadSmallToMediumFile(filePath, callback);
339
} else {
340
uploadVeryLargeFile(filePath, callback);
341
}
342
}
343
344
private void uploadSmallToMediumFile(Path filePath, LargeFileCallback callback) {
345
ParallelTransferOptions options = new ParallelTransferOptions()
346
.setBlockSizeLong(optimalBlockSize)
347
.setMaxConcurrency(maxConcurrency)
348
.setProgressListener(bytesTransferred -> {
349
if (callback != null) {
350
try {
351
long totalSize = Files.size(filePath);
352
callback.onProgress(bytesTransferred, totalSize);
353
} catch (IOException ex) {
354
System.err.println("Progress callback error: " + ex.getMessage());
355
}
356
}
357
});
358
359
BlobUploadFromFileOptions uploadOptions = new BlobUploadFromFileOptions(filePath.toString())
360
.setParallelTransferOptions(options)
361
.setHeaders(new BlobHttpHeaders()
362
.setContentType(Files.probeContentType(filePath)))
363
.setMetadata(Map.of(
364
"file-name", filePath.getFileName().toString(),
365
"file-size", String.valueOf(fileSize),
366
"upload-method", "parallel-transfer"));
367
368
try {
369
Response<BlockBlobItem> response = blockBlobClient.uploadFromFileWithResponse(
370
uploadOptions,
371
Duration.ofHours(2),
372
Context.NONE
373
);
374
375
if (callback != null) {
376
callback.onComplete(response.getValue());
377
}
378
379
} catch (Exception ex) {
380
if (callback != null) {
381
callback.onError(ex);
382
}
383
throw ex;
384
}
385
}
386
387
private void uploadVeryLargeFile(Path filePath, LargeFileCallback callback) throws IOException {
388
long fileSize = Files.size(filePath);
389
List<String> blockIds = new ArrayList<>();
390
391
try (FileInputStream fileStream = new FileInputStream(filePath.toFile())) {
392
long totalUploaded = 0;
393
int blockIndex = 0;
394
byte[] buffer = new byte[(int) optimalBlockSize];
395
396
while (true) {
397
int bytesRead = fileStream.read(buffer);
398
if (bytesRead == -1) break; // End of file
399
400
// Generate unique block ID
401
String blockId = Base64.getEncoder().encodeToString(
402
String.format("block-%06d", blockIndex).getBytes());
403
blockIds.add(blockId);
404
405
// Upload this block
406
byte[] blockData = bytesRead == buffer.length ? buffer : Arrays.copyOf(buffer, bytesRead);
407
blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(blockData));
408
409
totalUploaded += bytesRead;
410
blockIndex++;
411
412
// Progress callback
413
if (callback != null) {
414
callback.onProgress(totalUploaded, fileSize);
415
}
416
417
// Optional: Commit blocks in batches for very large files
418
if (blockIds.size() % 1000 == 0) {
419
System.out.println("Staged " + blockIds.size() + " blocks...");
420
}
421
}
422
423
// Commit all blocks
424
System.out.println("Committing " + blockIds.size() + " blocks...");
425
BlockBlobItem result = blockBlobClient.commitBlockList(blockIds);
426
427
if (callback != null) {
428
callback.onComplete(result);
429
}
430
431
} catch (Exception ex) {
432
if (callback != null) {
433
callback.onError(ex);
434
}
435
throw ex;
436
}
437
}
438
439
public interface LargeFileCallback {
440
void onProgress(long uploaded, long total);
441
void onComplete(BlockBlobItem result);
442
void onError(Exception error);
443
}
444
}
445
446
// Usage with comprehensive callback
447
LargeFileManager manager = new LargeFileManager(blockBlobClient);
448
449
manager.uploadLargeFile(Paths.get("very-large-file.zip"), new LargeFileManager.LargeFileCallback() {
450
private long lastLoggedMB = 0;
451
452
@Override
453
public void onProgress(long uploaded, long total) {
454
long uploadedMB = uploaded / 1024 / 1024;
455
456
if (uploadedMB >= lastLoggedMB + 100) { // Log every 100MB
457
double percentage = (double) uploaded / total * 100;
458
System.out.printf("Large file progress: %.1f%% (%d MB / %d MB)%n",
459
percentage, uploadedMB, total / 1024 / 1024);
460
lastLoggedMB = uploadedMB;
461
}
462
}
463
464
@Override
465
public void onComplete(BlockBlobItem result) {
466
System.out.println("Large file upload completed successfully!");
467
System.out.println("ETag: " + result.getETag());
468
System.out.println("Last modified: " + result.getLastModified());
469
}
470
471
@Override
472
public void onError(Exception error) {
473
System.err.println("Large file upload failed: " + error.getMessage());
474
error.printStackTrace();
475
}
476
});
477
```
478
479
### Resume Interrupted Uploads
480
481
```java
482
public class ResumableUploadManager {
483
private final BlockBlobClient blockBlobClient;
484
private final Path checkpointFile;
485
486
public ResumableUploadManager(BlockBlobClient blockBlobClient, String blobName) {
487
this.blockBlobClient = blockBlobClient;
488
this.checkpointFile = Paths.get(System.getProperty("java.io.tmpdir"),
489
blobName + ".upload.checkpoint");
490
}
491
492
public void resumableUpload(Path filePath, long blockSize) throws IOException {
493
long fileSize = Files.size(filePath);
494
UploadCheckpoint checkpoint = loadCheckpoint();
495
496
if (checkpoint == null) {
497
// Start new upload
498
checkpoint = new UploadCheckpoint();
499
checkpoint.filePath = filePath.toString();
500
checkpoint.fileSize = fileSize;
501
checkpoint.blockSize = blockSize;
502
checkpoint.uploadedBlocks = new HashSet<>();
503
}
504
505
System.out.println("Starting resumable upload...");
506
System.out.printf("File: %s (%d bytes)%n", filePath, fileSize);
507
System.out.printf("Previously uploaded blocks: %d%n", checkpoint.uploadedBlocks.size());
508
509
try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath.toFile(), "r")) {
510
long totalBlocks = (fileSize + blockSize - 1) / blockSize;
511
512
for (int blockIndex = 0; blockIndex < totalBlocks; blockIndex++) {
513
String blockId = generateBlockId(blockIndex);
514
515
// Skip if already uploaded
516
if (checkpoint.uploadedBlocks.contains(blockId)) {
517
continue;
518
}
519
520
// Read block data
521
long offset = (long) blockIndex * blockSize;
522
long currentBlockSize = Math.min(blockSize, fileSize - offset);
523
524
byte[] blockData = new byte[(int) currentBlockSize];
525
randomAccessFile.seek(offset);
526
randomAccessFile.readFully(blockData);
527
528
// Upload block with retry
529
uploadBlockWithRetry(blockId, blockData, 3);
530
531
// Update checkpoint
532
checkpoint.uploadedBlocks.add(blockId);
533
saveCheckpoint(checkpoint);
534
535
// Progress update
536
double progress = (double) checkpoint.uploadedBlocks.size() / totalBlocks * 100;
537
System.out.printf("Progress: %.1f%% (%d/%d blocks)%n",
538
progress, checkpoint.uploadedBlocks.size(), totalBlocks);
539
}
540
541
// Commit all blocks
542
List<String> allBlockIds = generateAllBlockIds(totalBlocks);
543
BlockBlobItem result = blockBlobClient.commitBlockList(allBlockIds);
544
545
// Clean up checkpoint
546
Files.deleteIfExists(checkpointFile);
547
548
System.out.println("Resumable upload completed successfully!");
549
System.out.println("ETag: " + result.getETag());
550
551
} catch (Exception ex) {
552
System.err.println("Resumable upload failed: " + ex.getMessage());
553
throw ex;
554
}
555
}
556
557
private void uploadBlockWithRetry(String blockId, byte[] data, int maxRetries) {
558
int attempt = 0;
559
while (attempt < maxRetries) {
560
try {
561
blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(data));
562
return; // Success
563
} catch (Exception ex) {
564
attempt++;
565
if (attempt >= maxRetries) {
566
throw new RuntimeException("Failed to upload block after " + maxRetries + " attempts", ex);
567
}
568
569
System.err.printf("Block upload attempt %d failed, retrying: %s%n", attempt, ex.getMessage());
570
571
try {
572
Thread.sleep(1000 * attempt); // Exponential backoff
573
} catch (InterruptedException ie) {
574
Thread.currentThread().interrupt();
575
throw new RuntimeException("Upload interrupted", ie);
576
}
577
}
578
}
579
}
580
581
private String generateBlockId(int blockIndex) {
582
return Base64.getEncoder().encodeToString(
583
String.format("block-%06d", blockIndex).getBytes());
584
}
585
586
private List<String> generateAllBlockIds(long totalBlocks) {
587
List<String> blockIds = new ArrayList<>();
588
for (int i = 0; i < totalBlocks; i++) {
589
blockIds.add(generateBlockId(i));
590
}
591
return blockIds;
592
}
593
594
private UploadCheckpoint loadCheckpoint() {
595
if (!Files.exists(checkpointFile)) {
596
return null;
597
}
598
599
try {
600
String json = Files.readString(checkpointFile);
601
return new Gson().fromJson(json, UploadCheckpoint.class);
602
} catch (Exception ex) {
603
System.err.println("Failed to load checkpoint: " + ex.getMessage());
604
return null;
605
}
606
}
607
608
private void saveCheckpoint(UploadCheckpoint checkpoint) {
609
try {
610
String json = new Gson().toJson(checkpoint);
611
Files.writeString(checkpointFile, json, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
612
} catch (Exception ex) {
613
System.err.println("Failed to save checkpoint: " + ex.getMessage());
614
}
615
}
616
617
private static class UploadCheckpoint {
618
String filePath;
619
long fileSize;
620
long blockSize;
621
Set<String> uploadedBlocks = new HashSet<>();
622
}
623
}
624
625
// Usage
626
ResumableUploadManager resumableManager = new ResumableUploadManager(blockBlobClient, "huge-file.dat");
627
resumableManager.resumableUpload(Paths.get("huge-file.dat"), 8 * 1024 * 1024L); // 8MB blocks
628
```
629
630
## Memory-Efficient Streaming
631
632
### Low-Memory Streaming Patterns
633
634
```java
635
public class MemoryEfficientStreaming {
636
637
// Process very large blobs without loading into memory
638
public void processLargeBlobIncrementally(BlobClient blobClient, DataProcessor processor) {
639
try (BlobInputStream inputStream = blobClient.openInputStream()) {
640
processStreamInChunks(inputStream, processor, 64 * 1024); // 64KB chunks
641
} catch (IOException ex) {
642
throw new RuntimeException("Failed to process large blob", ex);
643
}
644
}
645
646
private void processStreamInChunks(InputStream inputStream, DataProcessor processor, int chunkSize)
647
throws IOException {
648
byte[] buffer = new byte[chunkSize];
649
int bytesRead;
650
long totalProcessed = 0;
651
652
while ((bytesRead = inputStream.read(buffer)) != -1) {
653
// Process only the bytes actually read
654
if (bytesRead == buffer.length) {
655
processor.processChunk(buffer);
656
} else {
657
// Last chunk might be smaller
658
byte[] lastChunk = Arrays.copyOf(buffer, bytesRead);
659
processor.processChunk(lastChunk);
660
}
661
662
totalProcessed += bytesRead;
663
664
// Periodic garbage collection hint for long-running operations
665
if (totalProcessed % (10 * 1024 * 1024) == 0) { // Every 10MB
666
System.gc();
667
}
668
}
669
670
processor.processingComplete(totalProcessed);
671
}
672
673
// Streaming upload from data source without buffering entire content
674
public void streamUploadFromDataSource(BlockBlobClient blobClient, DataSource dataSource) {
675
try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {
676
byte[] buffer = new byte[8192]; // Small buffer
677
678
while (dataSource.hasMoreData()) {
679
int bytesGenerated = dataSource.generateData(buffer);
680
if (bytesGenerated > 0) {
681
outputStream.write(buffer, 0, bytesGenerated);
682
}
683
684
// Flush periodically to prevent memory buildup
685
if (dataSource.getBytesGenerated() % (1024 * 1024) == 0) { // Every 1MB
686
outputStream.flush();
687
}
688
}
689
690
} catch (IOException ex) {
691
throw new RuntimeException("Streaming upload failed", ex);
692
}
693
}
694
695
// Pipeline processing: download -> transform -> upload without intermediate storage
696
public void pipelineProcessing(BlobClient sourceBlobClient, BlockBlobClient targetBlobClient,
697
DataTransformer transformer) {
698
try (BlobInputStream inputStream = sourceBlobClient.openInputStream();
699
BlobOutputStream outputStream = targetBlobClient.getBlobOutputStream()) {
700
701
byte[] inputBuffer = new byte[16384]; // 16KB input buffer
702
int bytesRead;
703
704
while ((bytesRead = inputStream.read(inputBuffer)) != -1) {
705
// Transform data chunk
706
byte[] transformedData = transformer.transform(inputBuffer, 0, bytesRead);
707
708
// Write transformed data immediately
709
outputStream.write(transformedData);
710
711
// Clear buffers to help GC
712
Arrays.fill(inputBuffer, (byte) 0);
713
}
714
715
} catch (IOException ex) {
716
throw new RuntimeException("Pipeline processing failed", ex);
717
}
718
}
719
720
// Interfaces for extensibility
721
public interface DataProcessor {
722
void processChunk(byte[] chunk);
723
void processingComplete(long totalBytesProcessed);
724
}
725
726
public interface DataSource {
727
boolean hasMoreData();
728
int generateData(byte[] buffer);
729
long getBytesGenerated();
730
}
731
732
public interface DataTransformer {
733
byte[] transform(byte[] input, int offset, int length);
734
}
735
}
736
737
// Example implementations
738
class CSVProcessor implements MemoryEfficientStreaming.DataProcessor {
739
private final StringBuilder lineBuffer = new StringBuilder();
740
private int recordCount = 0;
741
742
@Override
743
public void processChunk(byte[] chunk) {
744
String chunkStr = new String(chunk, StandardCharsets.UTF_8);
745
lineBuffer.append(chunkStr);
746
747
// Process complete lines
748
String[] lines = lineBuffer.toString().split("\n", -1);
749
750
// Process all complete lines (all but the last)
751
for (int i = 0; i < lines.length - 1; i++) {
752
processCSVLine(lines[i]);
753
recordCount++;
754
}
755
756
// Keep the incomplete line in buffer
757
lineBuffer.setLength(0);
758
lineBuffer.append(lines[lines.length - 1]);
759
}
760
761
@Override
762
public void processingComplete(long totalBytesProcessed) {
763
// Process any remaining line
764
if (lineBuffer.length() > 0) {
765
processCSVLine(lineBuffer.toString());
766
recordCount++;
767
}
768
769
System.out.println("CSV processing complete:");
770
System.out.println("Total records processed: " + recordCount);
771
System.out.println("Total bytes processed: " + totalBytesProcessed);
772
}
773
774
private void processCSVLine(String line) {
775
// Implement CSV line processing
776
String[] fields = line.split(",");
777
// Process fields...
778
}
779
}
780
781
// Usage
782
MemoryEfficientStreaming streaming = new MemoryEfficientStreaming();
783
784
// Process large CSV blob
785
streaming.processLargeBlobIncrementally(largeCsvBlobClient, new CSVProcessor());
786
787
// Stream upload from generated data
788
streaming.streamUploadFromDataSource(targetBlobClient, new SyntheticDataSource());
789
```
790
791
## Performance Monitoring and Optimization
792
793
### Advanced Streaming Metrics
794
795
```java
796
public class StreamingMetrics {
797
private final String operationName;
798
private final long startTime;
799
private long bytesProcessed;
800
private final List<Long> throughputSamples;
801
private long lastSampleTime;
802
private long lastSampleBytes;
803
804
public StreamingMetrics(String operationName) {
805
this.operationName = operationName;
806
this.startTime = System.currentTimeMillis();
807
this.throughputSamples = new ArrayList<>();
808
this.lastSampleTime = startTime;
809
this.lastSampleBytes = 0;
810
}
811
812
public synchronized void recordBytes(long bytes) {
813
this.bytesProcessed += bytes;
814
815
long currentTime = System.currentTimeMillis();
816
817
// Sample throughput every second
818
if (currentTime - lastSampleTime >= 1000) {
819
long intervalBytes = bytesProcessed - lastSampleBytes;
820
double intervalSeconds = (currentTime - lastSampleTime) / 1000.0;
821
double throughputMBps = (intervalBytes / 1024.0 / 1024.0) / intervalSeconds;
822
823
throughputSamples.add((long) (throughputMBps * 100)); // Store as centibytes for precision
824
825
lastSampleTime = currentTime;
826
lastSampleBytes = bytesProcessed;
827
}
828
}
829
830
public StreamingReport generateReport() {
831
long totalTime = System.currentTimeMillis() - startTime;
832
double totalSeconds = totalTime / 1000.0;
833
double averageThroughput = totalSeconds > 0 ? (bytesProcessed / 1024.0 / 1024.0) / totalSeconds : 0;
834
835
double maxThroughput = throughputSamples.stream()
836
.mapToDouble(sample -> sample / 100.0)
837
.max()
838
.orElse(0.0);
839
840
double minThroughput = throughputSamples.stream()
841
.mapToDouble(sample -> sample / 100.0)
842
.min()
843
.orElse(0.0);
844
845
return new StreamingReport(
846
operationName,
847
bytesProcessed,
848
totalTime,
849
averageThroughput,
850
maxThroughput,
851
minThroughput,
852
new ArrayList<>(throughputSamples)
853
);
854
}
855
856
public static class StreamingReport {
857
public final String operation;
858
public final long totalBytes;
859
public final long totalTimeMs;
860
public final double averageThroughputMBps;
861
public final double maxThroughputMBps;
862
public final double minThroughputMBps;
863
public final List<Long> throughputSamples;
864
865
StreamingReport(String operation, long totalBytes, long totalTimeMs,
866
double averageThroughputMBps, double maxThroughputMBps, double minThroughputMBps,
867
List<Long> throughputSamples) {
868
this.operation = operation;
869
this.totalBytes = totalBytes;
870
this.totalTimeMs = totalTimeMs;
871
this.averageThroughputMBps = averageThroughputMBps;
872
this.maxThroughputMBps = maxThroughputMBps;
873
this.minThroughputMBps = minThroughputMBps;
874
this.throughputSamples = throughputSamples;
875
}
876
877
public void printReport() {
878
System.out.println("\n=== Streaming Performance Report ===");
879
System.out.println("Operation: " + operation);
880
System.out.printf("Total bytes: %s%n", formatBytes(totalBytes));
881
System.out.printf("Total time: %.2f seconds%n", totalTimeMs / 1000.0);
882
System.out.printf("Average throughput: %.2f MB/s%n", averageThroughputMBps);
883
System.out.printf("Max throughput: %.2f MB/s%n", maxThroughputMBps);
884
System.out.printf("Min throughput: %.2f MB/s%n", minThroughputMBps);
885
System.out.printf("Throughput stability: %.2f%%%n", calculateStability());
886
System.out.println("=====================================");
887
}
888
889
private double calculateStability() {
890
if (maxThroughputMBps == 0) return 100.0;
891
return (1.0 - (maxThroughputMBps - minThroughputMBps) / maxThroughputMBps) * 100.0;
892
}
893
894
private String formatBytes(long bytes) {
895
if (bytes < 1024) return bytes + " B";
896
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
897
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / 1024.0 / 1024.0);
898
return String.format("%.2f GB", bytes / 1024.0 / 1024.0 / 1024.0);
899
}
900
}
901
}
902
903
// Performance-monitored streaming
904
public class MonitoredStreamingOperations {
905
906
public StreamingMetrics.StreamingReport monitoredStreamDownload(BlobClient blobClient,
907
OutputStream outputStream) {
908
StreamingMetrics metrics = new StreamingMetrics("Blob Download");
909
910
try (BlobInputStream inputStream = blobClient.openInputStream()) {
911
byte[] buffer = new byte[64 * 1024]; // 64KB buffer
912
int bytesRead;
913
914
while ((bytesRead = inputStream.read(buffer)) != -1) {
915
outputStream.write(buffer, 0, bytesRead);
916
metrics.recordBytes(bytesRead);
917
}
918
919
} catch (IOException ex) {
920
throw new RuntimeException("Monitored download failed", ex);
921
}
922
923
return metrics.generateReport();
924
}
925
926
public StreamingMetrics.StreamingReport monitoredStreamUpload(BlockBlobClient blobClient,
927
InputStream inputStream) {
928
StreamingMetrics metrics = new StreamingMetrics("Blob Upload");
929
930
try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {
931
byte[] buffer = new byte[64 * 1024]; // 64KB buffer
932
int bytesRead;
933
934
while ((bytesRead = inputStream.read(buffer)) != -1) {
935
outputStream.write(buffer, 0, bytesRead);
936
metrics.recordBytes(bytesRead);
937
}
938
939
} catch (IOException ex) {
940
throw new RuntimeException("Monitored upload failed", ex);
941
}
942
943
return metrics.generateReport();
944
}
945
}
946
947
// Usage with performance monitoring
948
MonitoredStreamingOperations monitoredOps = new MonitoredStreamingOperations();
949
950
// Monitor download performance
951
try (FileOutputStream fileOutput = new FileOutputStream("downloaded-large-file.dat")) {
952
StreamingMetrics.StreamingReport downloadReport = monitoredOps.monitoredStreamDownload(
953
blobClient, fileOutput);
954
downloadReport.printReport();
955
}
956
957
// Monitor upload performance
958
try (FileInputStream fileInput = new FileInputStream("upload-large-file.dat")) {
959
StreamingMetrics.StreamingReport uploadReport = monitoredOps.monitoredStreamUpload(
960
blockBlobClient, fileInput);
961
uploadReport.printReport();
962
}
963
```
964
965
## Reactive Streaming Patterns
966
967
### Advanced Reactive Operations
968
969
```java
970
import reactor.core.publisher.Flux;
971
import reactor.core.publisher.Mono;
972
import reactor.core.scheduler.Schedulers;
973
974
public class ReactiveStreamingPatterns {
975
976
// Parallel processing of multiple blobs
977
public Mono<List<ProcessingResult>> processMultipleBlobsConcurrently(
978
List<BlobAsyncClient> blobClients,
979
int concurrency) {
980
981
return Flux.fromIterable(blobClients)
982
.flatMap(blobClient ->
983
processBlobReactively(blobClient)
984
.subscribeOn(Schedulers.boundedElastic()),
985
concurrency)
986
.collectList();
987
}
988
989
private Mono<ProcessingResult> processBlobReactively(BlobAsyncClient blobClient) {
990
return blobClient.downloadStream()
991
.reduce(0L, (total, buffer) -> total + buffer.remaining())
992
.map(totalBytes -> new ProcessingResult(blobClient.getBlobName(), totalBytes))
993
.doOnSuccess(result ->
994
System.out.println("Processed " + result.blobName + ": " + result.totalBytes + " bytes"))
995
.onErrorResume(error -> {
996
System.err.println("Failed to process " + blobClient.getBlobName() + ": " + error.getMessage());
997
return Mono.just(new ProcessingResult(blobClient.getBlobName(), -1));
998
});
999
}
1000
1001
// Stream transformation pipeline
1002
public Mono<Void> transformAndUpload(BlobAsyncClient sourceClient,
1003
BlobAsyncClient targetClient,
1004
Function<ByteBuffer, ByteBuffer> transformer) {
1005
1006
Flux<ByteBuffer> transformedStream = sourceClient.downloadStream()
1007
.map(transformer)
1008
.onErrorResume(error -> {
1009
System.err.println("Transformation error: " + error.getMessage());
1010
return Flux.empty();
1011
});
1012
1013
BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(transformedStream)
1014
.setParallelTransferOptions(new ParallelTransferOptions()
1015
.setBlockSizeLong(1024 * 1024L)
1016
.setMaxConcurrency(4));
1017
1018
return targetClient.uploadWithResponse(uploadOptions)
1019
.doOnSuccess(response ->
1020
System.out.println("Transform and upload completed: " + response.getStatusCode()))
1021
.then();
1022
}
1023
1024
// Batch processing with backpressure
1025
public Flux<BatchResult> batchProcessWithBackpressure(Flux<BlobAsyncClient> blobClients,
1026
int batchSize,
1027
Duration batchTimeout) {
1028
1029
return blobClients
1030
.buffer(batchSize, batchTimeout)
1031
.flatMap(batch -> processBatch(batch), 2) // Max 2 concurrent batches
1032
.onBackpressureBuffer(100); // Buffer up to 100 results
1033
}
1034
1035
private Mono<BatchResult> processBatch(List<BlobAsyncClient> batch) {
1036
return Flux.fromIterable(batch)
1037
.flatMap(blobClient ->
1038
blobClient.getProperties()
1039
.map(props -> props.getBlobSize())
1040
.onErrorReturn(0L))
1041
.reduce(Long::sum)
1042
.map(totalSize -> new BatchResult(batch.size(), totalSize))
1043
.doOnSuccess(result ->
1044
System.out.println("Batch processed: " + result.blobCount +
1045
" blobs, " + result.totalSize + " bytes"));
1046
}
1047
1048
// Streaming aggregation
1049
public Mono<AggregationResult> streamingAggregation(List<BlobAsyncClient> blobClients) {
1050
return Flux.fromIterable(blobClients)
1051
.flatMap(blobClient ->
1052
blobClient.downloadStream()
1053
.map(ByteBuffer::remaining)
1054
.cast(Long.class)
1055
.reduce(Long::sum)
1056
.map(size -> new BlobStats(blobClient.getBlobName(), size)))
1057
.reduce(new AggregationResult(), (agg, stats) -> {
1058
agg.totalBlobs++;
1059
agg.totalBytes += stats.size;
1060
agg.maxBlobSize = Math.max(agg.maxBlobSize, stats.size);
1061
agg.minBlobSize = agg.minBlobSize == 0 ? stats.size : Math.min(agg.minBlobSize, stats.size);
1062
return agg;
1063
})
1064
.doOnSuccess(result -> {
1065
result.avgBlobSize = result.totalBlobs > 0 ? result.totalBytes / result.totalBlobs : 0;
1066
System.out.println("Aggregation complete: " + result);
1067
});
1068
}
1069
1070
// Data classes for results
1071
public static class ProcessingResult {
1072
public final String blobName;
1073
public final long totalBytes;
1074
1075
ProcessingResult(String blobName, long totalBytes) {
1076
this.blobName = blobName;
1077
this.totalBytes = totalBytes;
1078
}
1079
}
1080
1081
public static class BatchResult {
1082
public final int blobCount;
1083
public final long totalSize;
1084
1085
BatchResult(int blobCount, long totalSize) {
1086
this.blobCount = blobCount;
1087
this.totalSize = totalSize;
1088
}
1089
}
1090
1091
public static class BlobStats {
1092
public final String name;
1093
public final long size;
1094
1095
BlobStats(String name, long size) {
1096
this.name = name;
1097
this.size = size;
1098
}
1099
}
1100
1101
public static class AggregationResult {
1102
public int totalBlobs = 0;
1103
public long totalBytes = 0;
1104
public long maxBlobSize = 0;
1105
public long minBlobSize = 0;
1106
public long avgBlobSize = 0;
1107
1108
@Override
1109
public String toString() {
1110
return String.format("AggregationResult{blobs=%d, totalBytes=%d, avgSize=%d, minSize=%d, maxSize=%d}",
1111
totalBlobs, totalBytes, avgBlobSize, minBlobSize, maxBlobSize);
1112
}
1113
}
1114
}
1115
1116
// Usage examples
1117
ReactiveStreamingPatterns patterns = new ReactiveStreamingPatterns();
1118
1119
// Process multiple blobs concurrently
1120
List<BlobAsyncClient> asyncClients = Arrays.asList(
1121
blobClient1.getAsyncClient(),
1122
blobClient2.getAsyncClient(),
1123
blobClient3.getAsyncClient()
1124
);
1125
1126
patterns.processMultipleBlobsConcurrently(asyncClients, 3)
1127
.doOnSuccess(results ->
1128
System.out.println("Processed " + results.size() + " blobs concurrently"))
1129
.subscribe();
1130
1131
// Transform and upload pipeline
1132
patterns.transformAndUpload(
1133
sourceBlobClient.getAsyncClient(),
1134
targetBlobClient.getAsyncClient(),
1135
buffer -> {
1136
// Example transformation: convert to uppercase text
1137
String text = StandardCharsets.UTF_8.decode(buffer).toString().toUpperCase();
1138
return ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8));
1139
}
1140
).subscribe();
1141
```
1142
1143
## Related Documentation
1144
1145
- [← Back to Overview](index.md)
1146
- [← Security & Authentication](security.md)
1147
- [Specialized Blob Types →](specialized-clients.md)
1148
- [Configuration Options →](options.md)