Microsoft Azure client library for Blob Storage - Azure Blob Storage is Microsoft's object storage solution for the cloud, optimized for storing massive amounts of unstructured data such as text or binary data.
—
This documentation covers advanced I/O operations in the Azure Storage Blob Java SDK, including streaming uploads and downloads, reactive programming patterns, and high-performance data transfer techniques.
Stream blob content directly without loading entire content into memory.
import com.azure.storage.blob.specialized.BlobInputStream;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
// Open blob as input stream
try (BlobInputStream blobInputStream = blobClient.openInputStream()) {
// Process stream incrementally
byte[] buffer = new byte[8192];
int bytesRead;
long totalBytesRead = 0;
while ((bytesRead = blobInputStream.read(buffer)) != -1) {
// Process buffer content
processChunk(buffer, bytesRead);
totalBytesRead += bytesRead;
if (totalBytesRead % (1024 * 1024) == 0) { // Log every MB
System.out.println("Processed: " + (totalBytesRead / 1024 / 1024) + " MB");
}
}
System.out.println("Total bytes processed: " + totalBytesRead);
} catch (IOException ex) {
System.err.println("Stream processing failed: " + ex.getMessage());
}
// Open blob input stream with options
BlobInputStreamOptions inputStreamOptions = new BlobInputStreamOptions()
.setBlockSize(1024 * 1024) // 1MB read buffer
.setRange(new BlobRange(0, 10 * 1024 * 1024L)) // Read first 10MB only
.setRequestConditions(new BlobRequestConditions()
.setIfMatch(blobClient.getProperties().getETag()));
try (BlobInputStream configuredStream = blobClient.openInputStream(inputStreamOptions)) {
// Process configured stream
processConfiguredStream(configuredStream);
}
// Stream text content line by line
try (BlobInputStream blobStream = blobClient.openInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(blobStream, StandardCharsets.UTF_8))) {
String line;
int lineNumber = 0;
while ((line = reader.readLine()) != null) {
lineNumber++;
processTextLine(line, lineNumber);
if (lineNumber % 1000 == 0) { // Log progress every 1000 lines
System.out.println("Processed " + lineNumber + " lines");
}
}
System.out.println("Total lines processed: " + lineNumber);
}import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
// Async streaming download
BlobAsyncClient asyncClient = blobClient.getAsyncClient();
Flux<ByteBuffer> downloadStream = asyncClient.downloadStream();
// Process stream reactively
downloadStream
.doOnNext(buffer -> {
int bufferSize = buffer.remaining();
System.out.println("Received buffer: " + bufferSize + " bytes");
// Process buffer content
byte[] bytes = new byte[bufferSize];
buffer.get(bytes);
processAsyncChunk(bytes);
})
.doOnError(error -> System.err.println("Download stream error: " + error.getMessage()))
.doOnComplete(() -> System.out.println("Download stream completed"))
.subscribe();
// Download with backpressure control
downloadStream
.buffer(10) // Buffer 10 chunks
.flatMap(buffers -> {
// Process batch of buffers
return Mono.fromRunnable(() -> processBatch(buffers));
}, 2) // Process max 2 batches concurrently
.doOnNext(result -> System.out.println("Batch processed"))
.subscribe();
// Download with retry and error handling
downloadStream
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.onErrorResume(error -> {
System.err.println("Download failed after retries: " + error.getMessage());
return Flux.empty();
})
.subscribe();// Stream specific ranges of large blobs
public class RangeBasedStreamer {
private final BlobClient blobClient;
private final long chunkSize;
public RangeBasedStreamer(BlobClient blobClient, long chunkSize) {
this.blobClient = blobClient;
this.chunkSize = chunkSize;
}
public void streamBlobInRanges(ProgressCallback callback) {
try {
BlobProperties properties = blobClient.getProperties();
long blobSize = properties.getBlobSize();
long totalProcessed = 0;
System.out.println("Streaming blob in " + chunkSize + " byte chunks");
System.out.println("Total blob size: " + blobSize + " bytes");
for (long offset = 0; offset < blobSize; offset += chunkSize) {
long rangeEnd = Math.min(offset + chunkSize - 1, blobSize - 1);
BlobRange range = new BlobRange(offset, rangeEnd - offset + 1);
// Download this range
BlobDownloadContentResponse rangeResponse = blobClient.downloadContentWithResponse(
new BlobDownloadOptions().setRange(range),
Duration.ofMinutes(2),
Context.NONE
);
BinaryData rangeData = rangeResponse.getValue();
byte[] chunkBytes = rangeData.toBytes();
// Process chunk
processChunkWithRange(chunkBytes, offset, rangeEnd);
totalProcessed += chunkBytes.length;
// Report progress
if (callback != null) {
double progress = (double) totalProcessed / blobSize * 100;
callback.onProgress(totalProcessed, blobSize, progress);
}
}
System.out.println("Range-based streaming completed");
} catch (Exception ex) {
System.err.println("Range streaming failed: " + ex.getMessage());
throw ex;
}
}
@FunctionalInterface
public interface ProgressCallback {
void onProgress(long processed, long total, double percentage);
}
private void processChunkWithRange(byte[] chunk, long startOffset, long endOffset) {
System.out.printf("Processing chunk: offset %d-%d (%d bytes)%n",
startOffset, endOffset, chunk.length);
// Implement chunk processing logic
}
}
// Usage
RangeBasedStreamer streamer = new RangeBasedStreamer(blobClient, 5 * 1024 * 1024L); // 5MB chunks
streamer.streamBlobInRanges((processed, total, percentage) -> {
System.out.printf("Progress: %.1f%% (%d/%d bytes)%n", percentage, processed, total);
});Stream data directly to blobs without buffering entire content.
import com.azure.storage.blob.specialized.BlobOutputStream;
// Basic output stream usage
try (BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream()) {
// Write data incrementally
for (int i = 0; i < 1000; i++) {
String data = "Data chunk " + i + "\n";
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
if (i % 100 == 0) {
outputStream.flush(); // Force upload of current blocks
System.out.println("Flushed at chunk " + i);
}
}
// Stream automatically closes and commits all blocks
System.out.println("Stream upload completed");
} catch (IOException ex) {
System.err.println("Stream upload failed: " + ex.getMessage());
}
// Configured output stream with options
ParallelTransferOptions streamTransferOptions = new ParallelTransferOptions()
.setBlockSizeLong(2 * 1024 * 1024L) // 2MB blocks
.setMaxConcurrency(4)
.setProgressListener(bytesTransferred ->
System.out.println("Stream progress: " + bytesTransferred + " bytes"));
BlobHttpHeaders streamHeaders = new BlobHttpHeaders()
.setContentType("application/octet-stream")
.setContentEncoding("gzip")
.setCacheControl("private, no-cache");
Map<String, String> streamMetadata = Map.of(
"streaming", "true",
"created-time", OffsetDateTime.now().toString(),
"source", "streaming-application"
);
try (BlobOutputStream configuredStream = blockBlobClient.getBlobOutputStream(
streamTransferOptions,
streamHeaders,
streamMetadata,
AccessTier.HOT,
new BlobRequestConditions().setIfNoneMatch("*"),
Context.NONE)) {
// Write large dataset incrementally
writeDataSetToStream(configuredStream);
System.out.println("Configured stream upload completed");
}// Upload from Flux of ByteBuffer
Flux<ByteBuffer> dataFlux = createDataFlux();
BlobParallelUploadOptions fluxUploadOptions = new BlobParallelUploadOptions(dataFlux)
.setParallelTransferOptions(new ParallelTransferOptions()
.setBlockSizeLong(1024 * 1024L) // 1MB blocks
.setMaxConcurrency(6)
.setProgressListener(bytesTransferred ->
System.out.println("Reactive upload progress: " + bytesTransferred)))
.setHeaders(new BlobHttpHeaders().setContentType("application/octet-stream"))
.setMetadata(Map.of("source", "reactive-stream"));
BlobAsyncClient asyncClient = blobClient.getAsyncClient();
// Upload reactively
Mono<Response<BlockBlobItem>> uploadMono = asyncClient.uploadWithResponse(fluxUploadOptions);
uploadMono
.doOnSuccess(response -> {
System.out.println("Reactive upload completed:");
System.out.println("ETag: " + response.getValue().getETag());
System.out.println("Status: " + response.getStatusCode());
})
.doOnError(error -> System.err.println("Reactive upload failed: " + error.getMessage()))
.subscribe();
// Create data flux from various sources
private Flux<ByteBuffer> createDataFlux() {
return Flux.range(1, 1000)
.map(i -> {
String data = "Reactive data chunk " + i + "\n";
return ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8));
})
.delayElements(Duration.ofMillis(10)); // Simulate streaming data
}
// Upload from file with reactive progress
Path filePath = Paths.get("large-file.dat");
Flux<ByteBuffer> fileFlux = DataBufferUtils.read(
filePath,
new DefaultDataBufferFactory(),
4096 // 4KB buffers
).map(DataBuffer::asByteBuffer);
BlobParallelUploadOptions fileFluxOptions = new BlobParallelUploadOptions(fileFlux)
.setParallelTransferOptions(new ParallelTransferOptions()
.setProgressListener(new ReactiveProgressTracker(Files.size(filePath))));
asyncClient.uploadWithResponse(fileFluxOptions)
.doOnSuccess(response -> System.out.println("File stream upload completed"))
.subscribe();public class LargeFileManager {
private final BlockBlobClient blockBlobClient;
private final long optimalBlockSize;
private final int maxConcurrency;
public LargeFileManager(BlockBlobClient blockBlobClient) {
this.blockBlobClient = blockBlobClient;
// Optimize based on file size and available memory
long availableMemory = Runtime.getRuntime().maxMemory();
this.optimalBlockSize = Math.min(32 * 1024 * 1024L, availableMemory / 16); // Max 32MB or 1/16 of heap
this.maxConcurrency = Math.min(16, Runtime.getRuntime().availableProcessors() * 2);
System.out.printf("Optimized for block size: %d MB, concurrency: %d%n",
optimalBlockSize / 1024 / 1024, maxConcurrency);
}
public void uploadLargeFile(Path filePath, LargeFileCallback callback) throws IOException {
long fileSize = Files.size(filePath);
if (fileSize <= 256 * 1024 * 1024L) { // <= 256MB, use simple upload
uploadSmallToMediumFile(filePath, callback);
} else {
uploadVeryLargeFile(filePath, callback);
}
}
private void uploadSmallToMediumFile(Path filePath, LargeFileCallback callback) {
ParallelTransferOptions options = new ParallelTransferOptions()
.setBlockSizeLong(optimalBlockSize)
.setMaxConcurrency(maxConcurrency)
.setProgressListener(bytesTransferred -> {
if (callback != null) {
try {
long totalSize = Files.size(filePath);
callback.onProgress(bytesTransferred, totalSize);
} catch (IOException ex) {
System.err.println("Progress callback error: " + ex.getMessage());
}
}
});
BlobUploadFromFileOptions uploadOptions = new BlobUploadFromFileOptions(filePath.toString())
.setParallelTransferOptions(options)
.setHeaders(new BlobHttpHeaders()
.setContentType(Files.probeContentType(filePath)))
.setMetadata(Map.of(
"file-name", filePath.getFileName().toString(),
"file-size", String.valueOf(fileSize),
"upload-method", "parallel-transfer"));
try {
Response<BlockBlobItem> response = blockBlobClient.uploadFromFileWithResponse(
uploadOptions,
Duration.ofHours(2),
Context.NONE
);
if (callback != null) {
callback.onComplete(response.getValue());
}
} catch (Exception ex) {
if (callback != null) {
callback.onError(ex);
}
throw ex;
}
}
private void uploadVeryLargeFile(Path filePath, LargeFileCallback callback) throws IOException {
long fileSize = Files.size(filePath);
List<String> blockIds = new ArrayList<>();
try (FileInputStream fileStream = new FileInputStream(filePath.toFile())) {
long totalUploaded = 0;
int blockIndex = 0;
byte[] buffer = new byte[(int) optimalBlockSize];
while (true) {
int bytesRead = fileStream.read(buffer);
if (bytesRead == -1) break; // End of file
// Generate unique block ID
String blockId = Base64.getEncoder().encodeToString(
String.format("block-%06d", blockIndex).getBytes());
blockIds.add(blockId);
// Upload this block
byte[] blockData = bytesRead == buffer.length ? buffer : Arrays.copyOf(buffer, bytesRead);
blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(blockData));
totalUploaded += bytesRead;
blockIndex++;
// Progress callback
if (callback != null) {
callback.onProgress(totalUploaded, fileSize);
}
// Optional: Commit blocks in batches for very large files
if (blockIds.size() % 1000 == 0) {
System.out.println("Staged " + blockIds.size() + " blocks...");
}
}
// Commit all blocks
System.out.println("Committing " + blockIds.size() + " blocks...");
BlockBlobItem result = blockBlobClient.commitBlockList(blockIds);
if (callback != null) {
callback.onComplete(result);
}
} catch (Exception ex) {
if (callback != null) {
callback.onError(ex);
}
throw ex;
}
}
public interface LargeFileCallback {
void onProgress(long uploaded, long total);
void onComplete(BlockBlobItem result);
void onError(Exception error);
}
}
// Usage with comprehensive callback
LargeFileManager manager = new LargeFileManager(blockBlobClient);
manager.uploadLargeFile(Paths.get("very-large-file.zip"), new LargeFileManager.LargeFileCallback() {
private long lastLoggedMB = 0;
@Override
public void onProgress(long uploaded, long total) {
long uploadedMB = uploaded / 1024 / 1024;
if (uploadedMB >= lastLoggedMB + 100) { // Log every 100MB
double percentage = (double) uploaded / total * 100;
System.out.printf("Large file progress: %.1f%% (%d MB / %d MB)%n",
percentage, uploadedMB, total / 1024 / 1024);
lastLoggedMB = uploadedMB;
}
}
@Override
public void onComplete(BlockBlobItem result) {
System.out.println("Large file upload completed successfully!");
System.out.println("ETag: " + result.getETag());
System.out.println("Last modified: " + result.getLastModified());
}
@Override
public void onError(Exception error) {
System.err.println("Large file upload failed: " + error.getMessage());
error.printStackTrace();
}
});public class ResumableUploadManager {
private final BlockBlobClient blockBlobClient;
private final Path checkpointFile;
public ResumableUploadManager(BlockBlobClient blockBlobClient, String blobName) {
this.blockBlobClient = blockBlobClient;
this.checkpointFile = Paths.get(System.getProperty("java.io.tmpdir"),
blobName + ".upload.checkpoint");
}
public void resumableUpload(Path filePath, long blockSize) throws IOException {
long fileSize = Files.size(filePath);
UploadCheckpoint checkpoint = loadCheckpoint();
if (checkpoint == null) {
// Start new upload
checkpoint = new UploadCheckpoint();
checkpoint.filePath = filePath.toString();
checkpoint.fileSize = fileSize;
checkpoint.blockSize = blockSize;
checkpoint.uploadedBlocks = new HashSet<>();
}
System.out.println("Starting resumable upload...");
System.out.printf("File: %s (%d bytes)%n", filePath, fileSize);
System.out.printf("Previously uploaded blocks: %d%n", checkpoint.uploadedBlocks.size());
try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath.toFile(), "r")) {
long totalBlocks = (fileSize + blockSize - 1) / blockSize;
for (int blockIndex = 0; blockIndex < totalBlocks; blockIndex++) {
String blockId = generateBlockId(blockIndex);
// Skip if already uploaded
if (checkpoint.uploadedBlocks.contains(blockId)) {
continue;
}
// Read block data
long offset = (long) blockIndex * blockSize;
long currentBlockSize = Math.min(blockSize, fileSize - offset);
byte[] blockData = new byte[(int) currentBlockSize];
randomAccessFile.seek(offset);
randomAccessFile.readFully(blockData);
// Upload block with retry
uploadBlockWithRetry(blockId, blockData, 3);
// Update checkpoint
checkpoint.uploadedBlocks.add(blockId);
saveCheckpoint(checkpoint);
// Progress update
double progress = (double) checkpoint.uploadedBlocks.size() / totalBlocks * 100;
System.out.printf("Progress: %.1f%% (%d/%d blocks)%n",
progress, checkpoint.uploadedBlocks.size(), totalBlocks);
}
// Commit all blocks
List<String> allBlockIds = generateAllBlockIds(totalBlocks);
BlockBlobItem result = blockBlobClient.commitBlockList(allBlockIds);
// Clean up checkpoint
Files.deleteIfExists(checkpointFile);
System.out.println("Resumable upload completed successfully!");
System.out.println("ETag: " + result.getETag());
} catch (Exception ex) {
System.err.println("Resumable upload failed: " + ex.getMessage());
throw ex;
}
}
private void uploadBlockWithRetry(String blockId, byte[] data, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(data));
return; // Success
} catch (Exception ex) {
attempt++;
if (attempt >= maxRetries) {
throw new RuntimeException("Failed to upload block after " + maxRetries + " attempts", ex);
}
System.err.printf("Block upload attempt %d failed, retrying: %s%n", attempt, ex.getMessage());
try {
Thread.sleep(1000 * attempt); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Upload interrupted", ie);
}
}
}
}
private String generateBlockId(int blockIndex) {
return Base64.getEncoder().encodeToString(
String.format("block-%06d", blockIndex).getBytes());
}
private List<String> generateAllBlockIds(long totalBlocks) {
List<String> blockIds = new ArrayList<>();
for (int i = 0; i < totalBlocks; i++) {
blockIds.add(generateBlockId(i));
}
return blockIds;
}
private UploadCheckpoint loadCheckpoint() {
if (!Files.exists(checkpointFile)) {
return null;
}
try {
String json = Files.readString(checkpointFile);
return new Gson().fromJson(json, UploadCheckpoint.class);
} catch (Exception ex) {
System.err.println("Failed to load checkpoint: " + ex.getMessage());
return null;
}
}
private void saveCheckpoint(UploadCheckpoint checkpoint) {
try {
String json = new Gson().toJson(checkpoint);
Files.writeString(checkpointFile, json, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} catch (Exception ex) {
System.err.println("Failed to save checkpoint: " + ex.getMessage());
}
}
private static class UploadCheckpoint {
String filePath;
long fileSize;
long blockSize;
Set<String> uploadedBlocks = new HashSet<>();
}
}
// Usage
ResumableUploadManager resumableManager = new ResumableUploadManager(blockBlobClient, "huge-file.dat");
resumableManager.resumableUpload(Paths.get("huge-file.dat"), 8 * 1024 * 1024L); // 8MB blockspublic class MemoryEfficientStreaming {
// Process very large blobs without loading into memory
public void processLargeBlobIncrementally(BlobClient blobClient, DataProcessor processor) {
try (BlobInputStream inputStream = blobClient.openInputStream()) {
processStreamInChunks(inputStream, processor, 64 * 1024); // 64KB chunks
} catch (IOException ex) {
throw new RuntimeException("Failed to process large blob", ex);
}
}
private void processStreamInChunks(InputStream inputStream, DataProcessor processor, int chunkSize)
throws IOException {
byte[] buffer = new byte[chunkSize];
int bytesRead;
long totalProcessed = 0;
while ((bytesRead = inputStream.read(buffer)) != -1) {
// Process only the bytes actually read
if (bytesRead == buffer.length) {
processor.processChunk(buffer);
} else {
// Last chunk might be smaller
byte[] lastChunk = Arrays.copyOf(buffer, bytesRead);
processor.processChunk(lastChunk);
}
totalProcessed += bytesRead;
// Periodic garbage collection hint for long-running operations
if (totalProcessed % (10 * 1024 * 1024) == 0) { // Every 10MB
System.gc();
}
}
processor.processingComplete(totalProcessed);
}
// Streaming upload from data source without buffering entire content
public void streamUploadFromDataSource(BlockBlobClient blobClient, DataSource dataSource) {
try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {
byte[] buffer = new byte[8192]; // Small buffer
while (dataSource.hasMoreData()) {
int bytesGenerated = dataSource.generateData(buffer);
if (bytesGenerated > 0) {
outputStream.write(buffer, 0, bytesGenerated);
}
// Flush periodically to prevent memory buildup
if (dataSource.getBytesGenerated() % (1024 * 1024) == 0) { // Every 1MB
outputStream.flush();
}
}
} catch (IOException ex) {
throw new RuntimeException("Streaming upload failed", ex);
}
}
// Pipeline processing: download -> transform -> upload without intermediate storage
public void pipelineProcessing(BlobClient sourceBlobClient, BlockBlobClient targetBlobClient,
DataTransformer transformer) {
try (BlobInputStream inputStream = sourceBlobClient.openInputStream();
BlobOutputStream outputStream = targetBlobClient.getBlobOutputStream()) {
byte[] inputBuffer = new byte[16384]; // 16KB input buffer
int bytesRead;
while ((bytesRead = inputStream.read(inputBuffer)) != -1) {
// Transform data chunk
byte[] transformedData = transformer.transform(inputBuffer, 0, bytesRead);
// Write transformed data immediately
outputStream.write(transformedData);
// Clear buffers to help GC
Arrays.fill(inputBuffer, (byte) 0);
}
} catch (IOException ex) {
throw new RuntimeException("Pipeline processing failed", ex);
}
}
// Interfaces for extensibility
public interface DataProcessor {
void processChunk(byte[] chunk);
void processingComplete(long totalBytesProcessed);
}
public interface DataSource {
boolean hasMoreData();
int generateData(byte[] buffer);
long getBytesGenerated();
}
public interface DataTransformer {
byte[] transform(byte[] input, int offset, int length);
}
}
// Example implementations
class CSVProcessor implements MemoryEfficientStreaming.DataProcessor {
private final StringBuilder lineBuffer = new StringBuilder();
private int recordCount = 0;
@Override
public void processChunk(byte[] chunk) {
String chunkStr = new String(chunk, StandardCharsets.UTF_8);
lineBuffer.append(chunkStr);
// Process complete lines
String[] lines = lineBuffer.toString().split("\n", -1);
// Process all complete lines (all but the last)
for (int i = 0; i < lines.length - 1; i++) {
processCSVLine(lines[i]);
recordCount++;
}
// Keep the incomplete line in buffer
lineBuffer.setLength(0);
lineBuffer.append(lines[lines.length - 1]);
}
@Override
public void processingComplete(long totalBytesProcessed) {
// Process any remaining line
if (lineBuffer.length() > 0) {
processCSVLine(lineBuffer.toString());
recordCount++;
}
System.out.println("CSV processing complete:");
System.out.println("Total records processed: " + recordCount);
System.out.println("Total bytes processed: " + totalBytesProcessed);
}
private void processCSVLine(String line) {
// Implement CSV line processing
String[] fields = line.split(",");
// Process fields...
}
}
// Usage
MemoryEfficientStreaming streaming = new MemoryEfficientStreaming();
// Process large CSV blob
streaming.processLargeBlobIncrementally(largeCsvBlobClient, new CSVProcessor());
// Stream upload from generated data
streaming.streamUploadFromDataSource(targetBlobClient, new SyntheticDataSource());public class StreamingMetrics {
private final String operationName;
private final long startTime;
private long bytesProcessed;
private final List<Long> throughputSamples;
private long lastSampleTime;
private long lastSampleBytes;
public StreamingMetrics(String operationName) {
this.operationName = operationName;
this.startTime = System.currentTimeMillis();
this.throughputSamples = new ArrayList<>();
this.lastSampleTime = startTime;
this.lastSampleBytes = 0;
}
public synchronized void recordBytes(long bytes) {
this.bytesProcessed += bytes;
long currentTime = System.currentTimeMillis();
// Sample throughput every second
if (currentTime - lastSampleTime >= 1000) {
long intervalBytes = bytesProcessed - lastSampleBytes;
double intervalSeconds = (currentTime - lastSampleTime) / 1000.0;
double throughputMBps = (intervalBytes / 1024.0 / 1024.0) / intervalSeconds;
throughputSamples.add((long) (throughputMBps * 100)); // Store as centibytes for precision
lastSampleTime = currentTime;
lastSampleBytes = bytesProcessed;
}
}
public StreamingReport generateReport() {
long totalTime = System.currentTimeMillis() - startTime;
double totalSeconds = totalTime / 1000.0;
double averageThroughput = totalSeconds > 0 ? (bytesProcessed / 1024.0 / 1024.0) / totalSeconds : 0;
double maxThroughput = throughputSamples.stream()
.mapToDouble(sample -> sample / 100.0)
.max()
.orElse(0.0);
double minThroughput = throughputSamples.stream()
.mapToDouble(sample -> sample / 100.0)
.min()
.orElse(0.0);
return new StreamingReport(
operationName,
bytesProcessed,
totalTime,
averageThroughput,
maxThroughput,
minThroughput,
new ArrayList<>(throughputSamples)
);
}
public static class StreamingReport {
public final String operation;
public final long totalBytes;
public final long totalTimeMs;
public final double averageThroughputMBps;
public final double maxThroughputMBps;
public final double minThroughputMBps;
public final List<Long> throughputSamples;
StreamingReport(String operation, long totalBytes, long totalTimeMs,
double averageThroughputMBps, double maxThroughputMBps, double minThroughputMBps,
List<Long> throughputSamples) {
this.operation = operation;
this.totalBytes = totalBytes;
this.totalTimeMs = totalTimeMs;
this.averageThroughputMBps = averageThroughputMBps;
this.maxThroughputMBps = maxThroughputMBps;
this.minThroughputMBps = minThroughputMBps;
this.throughputSamples = throughputSamples;
}
public void printReport() {
System.out.println("\n=== Streaming Performance Report ===");
System.out.println("Operation: " + operation);
System.out.printf("Total bytes: %s%n", formatBytes(totalBytes));
System.out.printf("Total time: %.2f seconds%n", totalTimeMs / 1000.0);
System.out.printf("Average throughput: %.2f MB/s%n", averageThroughputMBps);
System.out.printf("Max throughput: %.2f MB/s%n", maxThroughputMBps);
System.out.printf("Min throughput: %.2f MB/s%n", minThroughputMBps);
System.out.printf("Throughput stability: %.2f%%%n", calculateStability());
System.out.println("=====================================");
}
private double calculateStability() {
if (maxThroughputMBps == 0) return 100.0;
return (1.0 - (maxThroughputMBps - minThroughputMBps) / maxThroughputMBps) * 100.0;
}
private String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / 1024.0 / 1024.0);
return String.format("%.2f GB", bytes / 1024.0 / 1024.0 / 1024.0);
}
}
}
// Performance-monitored streaming
public class MonitoredStreamingOperations {
public StreamingMetrics.StreamingReport monitoredStreamDownload(BlobClient blobClient,
OutputStream outputStream) {
StreamingMetrics metrics = new StreamingMetrics("Blob Download");
try (BlobInputStream inputStream = blobClient.openInputStream()) {
byte[] buffer = new byte[64 * 1024]; // 64KB buffer
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
metrics.recordBytes(bytesRead);
}
} catch (IOException ex) {
throw new RuntimeException("Monitored download failed", ex);
}
return metrics.generateReport();
}
public StreamingMetrics.StreamingReport monitoredStreamUpload(BlockBlobClient blobClient,
InputStream inputStream) {
StreamingMetrics metrics = new StreamingMetrics("Blob Upload");
try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {
byte[] buffer = new byte[64 * 1024]; // 64KB buffer
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
metrics.recordBytes(bytesRead);
}
} catch (IOException ex) {
throw new RuntimeException("Monitored upload failed", ex);
}
return metrics.generateReport();
}
}
// Usage with performance monitoring
MonitoredStreamingOperations monitoredOps = new MonitoredStreamingOperations();
// Monitor download performance
try (FileOutputStream fileOutput = new FileOutputStream("downloaded-large-file.dat")) {
StreamingMetrics.StreamingReport downloadReport = monitoredOps.monitoredStreamDownload(
blobClient, fileOutput);
downloadReport.printReport();
}
// Monitor upload performance
try (FileInputStream fileInput = new FileInputStream("upload-large-file.dat")) {
StreamingMetrics.StreamingReport uploadReport = monitoredOps.monitoredStreamUpload(
blockBlobClient, fileInput);
uploadReport.printReport();
}import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class ReactiveStreamingPatterns {
// Parallel processing of multiple blobs
public Mono<List<ProcessingResult>> processMultipleBlobsConcurrently(
List<BlobAsyncClient> blobClients,
int concurrency) {
return Flux.fromIterable(blobClients)
.flatMap(blobClient ->
processBlobReactively(blobClient)
.subscribeOn(Schedulers.boundedElastic()),
concurrency)
.collectList();
}
private Mono<ProcessingResult> processBlobReactively(BlobAsyncClient blobClient) {
return blobClient.downloadStream()
.reduce(0L, (total, buffer) -> total + buffer.remaining())
.map(totalBytes -> new ProcessingResult(blobClient.getBlobName(), totalBytes))
.doOnSuccess(result ->
System.out.println("Processed " + result.blobName + ": " + result.totalBytes + " bytes"))
.onErrorResume(error -> {
System.err.println("Failed to process " + blobClient.getBlobName() + ": " + error.getMessage());
return Mono.just(new ProcessingResult(blobClient.getBlobName(), -1));
});
}
// Stream transformation pipeline
public Mono<Void> transformAndUpload(BlobAsyncClient sourceClient,
BlobAsyncClient targetClient,
Function<ByteBuffer, ByteBuffer> transformer) {
Flux<ByteBuffer> transformedStream = sourceClient.downloadStream()
.map(transformer)
.onErrorResume(error -> {
System.err.println("Transformation error: " + error.getMessage());
return Flux.empty();
});
BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(transformedStream)
.setParallelTransferOptions(new ParallelTransferOptions()
.setBlockSizeLong(1024 * 1024L)
.setMaxConcurrency(4));
return targetClient.uploadWithResponse(uploadOptions)
.doOnSuccess(response ->
System.out.println("Transform and upload completed: " + response.getStatusCode()))
.then();
}
// Batch processing with backpressure
public Flux<BatchResult> batchProcessWithBackpressure(Flux<BlobAsyncClient> blobClients,
int batchSize,
Duration batchTimeout) {
return blobClients
.buffer(batchSize, batchTimeout)
.flatMap(batch -> processBatch(batch), 2) // Max 2 concurrent batches
.onBackpressureBuffer(100); // Buffer up to 100 results
}
private Mono<BatchResult> processBatch(List<BlobAsyncClient> batch) {
return Flux.fromIterable(batch)
.flatMap(blobClient ->
blobClient.getProperties()
.map(props -> props.getBlobSize())
.onErrorReturn(0L))
.reduce(Long::sum)
.map(totalSize -> new BatchResult(batch.size(), totalSize))
.doOnSuccess(result ->
System.out.println("Batch processed: " + result.blobCount +
" blobs, " + result.totalSize + " bytes"));
}
// Streaming aggregation
public Mono<AggregationResult> streamingAggregation(List<BlobAsyncClient> blobClients) {
return Flux.fromIterable(blobClients)
.flatMap(blobClient ->
blobClient.downloadStream()
.map(ByteBuffer::remaining)
.cast(Long.class)
.reduce(Long::sum)
.map(size -> new BlobStats(blobClient.getBlobName(), size)))
.reduce(new AggregationResult(), (agg, stats) -> {
agg.totalBlobs++;
agg.totalBytes += stats.size;
agg.maxBlobSize = Math.max(agg.maxBlobSize, stats.size);
agg.minBlobSize = agg.minBlobSize == 0 ? stats.size : Math.min(agg.minBlobSize, stats.size);
return agg;
})
.doOnSuccess(result -> {
result.avgBlobSize = result.totalBlobs > 0 ? result.totalBytes / result.totalBlobs : 0;
System.out.println("Aggregation complete: " + result);
});
}
// Data classes for results
public static class ProcessingResult {
public final String blobName;
public final long totalBytes;
ProcessingResult(String blobName, long totalBytes) {
this.blobName = blobName;
this.totalBytes = totalBytes;
}
}
public static class BatchResult {
public final int blobCount;
public final long totalSize;
BatchResult(int blobCount, long totalSize) {
this.blobCount = blobCount;
this.totalSize = totalSize;
}
}
public static class BlobStats {
public final String name;
public final long size;
BlobStats(String name, long size) {
this.name = name;
this.size = size;
}
}
public static class AggregationResult {
public int totalBlobs = 0;
public long totalBytes = 0;
public long maxBlobSize = 0;
public long minBlobSize = 0;
public long avgBlobSize = 0;
@Override
public String toString() {
return String.format("AggregationResult{blobs=%d, totalBytes=%d, avgSize=%d, minSize=%d, maxSize=%d}",
totalBlobs, totalBytes, avgBlobSize, minBlobSize, maxBlobSize);
}
}
}
// Usage examples
ReactiveStreamingPatterns patterns = new ReactiveStreamingPatterns();
// Process multiple blobs concurrently
List<BlobAsyncClient> asyncClients = Arrays.asList(
blobClient1.getAsyncClient(),
blobClient2.getAsyncClient(),
blobClient3.getAsyncClient()
);
patterns.processMultipleBlobsConcurrently(asyncClients, 3)
.doOnSuccess(results ->
System.out.println("Processed " + results.size() + " blobs concurrently"))
.subscribe();
// Transform and upload pipeline
patterns.transformAndUpload(
sourceBlobClient.getAsyncClient(),
targetBlobClient.getAsyncClient(),
buffer -> {
// Example transformation: convert to uppercase text
String text = StandardCharsets.UTF_8.decode(buffer).toString().toUpperCase();
return ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8));
}
).subscribe();Install with Tessl CLI
npx tessl i tessl/maven-com-azure--azure-storage-blob