CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-azure--azure-storage-blob

Microsoft Azure client library for Blob Storage - Azure Blob Storage is Microsoft's object storage solution for the cloud, optimized for storing massive amounts of unstructured data such as text or binary data.

Pending
Overview
Eval results
Files

streaming.mddocs/

Streaming & Advanced I/O

This documentation covers advanced I/O operations in the Azure Storage Blob Java SDK, including streaming uploads and downloads, reactive programming patterns, and high-performance data transfer techniques.

Input Streams and Downloads

BlobInputStream

Stream blob content directly without loading entire content into memory.

import com.azure.storage.blob.specialized.BlobInputStream;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;

// Open blob as input stream
try (BlobInputStream blobInputStream = blobClient.openInputStream()) {
    // Process stream incrementally
    byte[] buffer = new byte[8192];
    int bytesRead;
    long totalBytesRead = 0;
    
    while ((bytesRead = blobInputStream.read(buffer)) != -1) {
        // Process buffer content
        processChunk(buffer, bytesRead);
        totalBytesRead += bytesRead;
        
        if (totalBytesRead % (1024 * 1024) == 0) { // Log every MB
            System.out.println("Processed: " + (totalBytesRead / 1024 / 1024) + " MB");
        }
    }
    
    System.out.println("Total bytes processed: " + totalBytesRead);
    
} catch (IOException ex) {
    System.err.println("Stream processing failed: " + ex.getMessage());
}

// Open blob input stream with options
BlobInputStreamOptions inputStreamOptions = new BlobInputStreamOptions()
    .setBlockSize(1024 * 1024) // 1MB read buffer
    .setRange(new BlobRange(0, 10 * 1024 * 1024L)) // Read first 10MB only
    .setRequestConditions(new BlobRequestConditions()
        .setIfMatch(blobClient.getProperties().getETag()));

try (BlobInputStream configuredStream = blobClient.openInputStream(inputStreamOptions)) {
    // Process configured stream
    processConfiguredStream(configuredStream);
}

// Stream text content line by line
try (BlobInputStream blobStream = blobClient.openInputStream();
     BufferedReader reader = new BufferedReader(new InputStreamReader(blobStream, StandardCharsets.UTF_8))) {
    
    String line;
    int lineNumber = 0;
    
    while ((line = reader.readLine()) != null) {
        lineNumber++;
        processTextLine(line, lineNumber);
        
        if (lineNumber % 1000 == 0) { // Log progress every 1000 lines
            System.out.println("Processed " + lineNumber + " lines");
        }
    }
    
    System.out.println("Total lines processed: " + lineNumber);
}

Streaming Downloads with Reactive Programming

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;

// Async streaming download
BlobAsyncClient asyncClient = blobClient.getAsyncClient();

Flux<ByteBuffer> downloadStream = asyncClient.downloadStream();

// Process stream reactively
downloadStream
    .doOnNext(buffer -> {
        int bufferSize = buffer.remaining();
        System.out.println("Received buffer: " + bufferSize + " bytes");
        
        // Process buffer content
        byte[] bytes = new byte[bufferSize];
        buffer.get(bytes);
        processAsyncChunk(bytes);
    })
    .doOnError(error -> System.err.println("Download stream error: " + error.getMessage()))
    .doOnComplete(() -> System.out.println("Download stream completed"))
    .subscribe();

// Download with backpressure control
downloadStream
    .buffer(10) // Buffer 10 chunks
    .flatMap(buffers -> {
        // Process batch of buffers
        return Mono.fromRunnable(() -> processBatch(buffers));
    }, 2) // Process max 2 batches concurrently
    .doOnNext(result -> System.out.println("Batch processed"))
    .subscribe();

// Download with retry and error handling
downloadStream
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
    .onErrorResume(error -> {
        System.err.println("Download failed after retries: " + error.getMessage());
        return Flux.empty();
    })
    .subscribe();

Range-Based Streaming

// Stream specific ranges of large blobs
public class RangeBasedStreamer {
    private final BlobClient blobClient;
    private final long chunkSize;
    
    public RangeBasedStreamer(BlobClient blobClient, long chunkSize) {
        this.blobClient = blobClient;
        this.chunkSize = chunkSize;
    }
    
    public void streamBlobInRanges(ProgressCallback callback) {
        try {
            BlobProperties properties = blobClient.getProperties();
            long blobSize = properties.getBlobSize();
            long totalProcessed = 0;
            
            System.out.println("Streaming blob in " + chunkSize + " byte chunks");
            System.out.println("Total blob size: " + blobSize + " bytes");
            
            for (long offset = 0; offset < blobSize; offset += chunkSize) {
                long rangeEnd = Math.min(offset + chunkSize - 1, blobSize - 1);
                BlobRange range = new BlobRange(offset, rangeEnd - offset + 1);
                
                // Download this range
                BlobDownloadContentResponse rangeResponse = blobClient.downloadContentWithResponse(
                    new BlobDownloadOptions().setRange(range),
                    Duration.ofMinutes(2),
                    Context.NONE
                );
                
                BinaryData rangeData = rangeResponse.getValue();
                byte[] chunkBytes = rangeData.toBytes();
                
                // Process chunk
                processChunkWithRange(chunkBytes, offset, rangeEnd);
                
                totalProcessed += chunkBytes.length;
                
                // Report progress
                if (callback != null) {
                    double progress = (double) totalProcessed / blobSize * 100;
                    callback.onProgress(totalProcessed, blobSize, progress);
                }
            }
            
            System.out.println("Range-based streaming completed");
            
        } catch (Exception ex) {
            System.err.println("Range streaming failed: " + ex.getMessage());
            throw ex;
        }
    }
    
    @FunctionalInterface
    public interface ProgressCallback {
        void onProgress(long processed, long total, double percentage);
    }
    
    private void processChunkWithRange(byte[] chunk, long startOffset, long endOffset) {
        System.out.printf("Processing chunk: offset %d-%d (%d bytes)%n", 
            startOffset, endOffset, chunk.length);
        // Implement chunk processing logic
    }
}

// Usage
RangeBasedStreamer streamer = new RangeBasedStreamer(blobClient, 5 * 1024 * 1024L); // 5MB chunks

streamer.streamBlobInRanges((processed, total, percentage) -> {
    System.out.printf("Progress: %.1f%% (%d/%d bytes)%n", percentage, processed, total);
});

Output Streams and Uploads

BlobOutputStream

Stream data directly to blobs without buffering entire content.

import com.azure.storage.blob.specialized.BlobOutputStream;

// Basic output stream usage
try (BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream()) {
    // Write data incrementally
    for (int i = 0; i < 1000; i++) {
        String data = "Data chunk " + i + "\n";
        outputStream.write(data.getBytes(StandardCharsets.UTF_8));
        
        if (i % 100 == 0) {
            outputStream.flush(); // Force upload of current blocks
            System.out.println("Flushed at chunk " + i);
        }
    }
    
    // Stream automatically closes and commits all blocks
    System.out.println("Stream upload completed");
    
} catch (IOException ex) {
    System.err.println("Stream upload failed: " + ex.getMessage());
}

// Configured output stream with options
ParallelTransferOptions streamTransferOptions = new ParallelTransferOptions()
    .setBlockSizeLong(2 * 1024 * 1024L) // 2MB blocks
    .setMaxConcurrency(4)
    .setProgressListener(bytesTransferred -> 
        System.out.println("Stream progress: " + bytesTransferred + " bytes"));

BlobHttpHeaders streamHeaders = new BlobHttpHeaders()
    .setContentType("application/octet-stream")
    .setContentEncoding("gzip")
    .setCacheControl("private, no-cache");

Map<String, String> streamMetadata = Map.of(
    "streaming", "true",
    "created-time", OffsetDateTime.now().toString(),
    "source", "streaming-application"
);

try (BlobOutputStream configuredStream = blockBlobClient.getBlobOutputStream(
        streamTransferOptions,
        streamHeaders,
        streamMetadata,
        AccessTier.HOT,
        new BlobRequestConditions().setIfNoneMatch("*"),
        Context.NONE)) {
    
    // Write large dataset incrementally
    writeDataSetToStream(configuredStream);
    
    System.out.println("Configured stream upload completed");
}

Reactive Streaming Uploads

// Upload from Flux of ByteBuffer
Flux<ByteBuffer> dataFlux = createDataFlux();

BlobParallelUploadOptions fluxUploadOptions = new BlobParallelUploadOptions(dataFlux)
    .setParallelTransferOptions(new ParallelTransferOptions()
        .setBlockSizeLong(1024 * 1024L) // 1MB blocks
        .setMaxConcurrency(6)
        .setProgressListener(bytesTransferred -> 
            System.out.println("Reactive upload progress: " + bytesTransferred)))
    .setHeaders(new BlobHttpHeaders().setContentType("application/octet-stream"))
    .setMetadata(Map.of("source", "reactive-stream"));

BlobAsyncClient asyncClient = blobClient.getAsyncClient();

// Upload reactively
Mono<Response<BlockBlobItem>> uploadMono = asyncClient.uploadWithResponse(fluxUploadOptions);

uploadMono
    .doOnSuccess(response -> {
        System.out.println("Reactive upload completed:");
        System.out.println("ETag: " + response.getValue().getETag());
        System.out.println("Status: " + response.getStatusCode());
    })
    .doOnError(error -> System.err.println("Reactive upload failed: " + error.getMessage()))
    .subscribe();

// Create data flux from various sources
private Flux<ByteBuffer> createDataFlux() {
    return Flux.range(1, 1000)
        .map(i -> {
            String data = "Reactive data chunk " + i + "\n";
            return ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8));
        })
        .delayElements(Duration.ofMillis(10)); // Simulate streaming data
}

// Upload from file with reactive progress
Path filePath = Paths.get("large-file.dat");
Flux<ByteBuffer> fileFlux = DataBufferUtils.read(
    filePath, 
    new DefaultDataBufferFactory(), 
    4096 // 4KB buffers
).map(DataBuffer::asByteBuffer);

BlobParallelUploadOptions fileFluxOptions = new BlobParallelUploadOptions(fileFlux)
    .setParallelTransferOptions(new ParallelTransferOptions()
        .setProgressListener(new ReactiveProgressTracker(Files.size(filePath))));

asyncClient.uploadWithResponse(fileFluxOptions)
    .doOnSuccess(response -> System.out.println("File stream upload completed"))
    .subscribe();

Large File Handling

Optimized Large File Operations

public class LargeFileManager {
    private final BlockBlobClient blockBlobClient;
    private final long optimalBlockSize;
    private final int maxConcurrency;
    
    public LargeFileManager(BlockBlobClient blockBlobClient) {
        this.blockBlobClient = blockBlobClient;
        
        // Optimize based on file size and available memory
        long availableMemory = Runtime.getRuntime().maxMemory();
        this.optimalBlockSize = Math.min(32 * 1024 * 1024L, availableMemory / 16); // Max 32MB or 1/16 of heap
        this.maxConcurrency = Math.min(16, Runtime.getRuntime().availableProcessors() * 2);
        
        System.out.printf("Optimized for block size: %d MB, concurrency: %d%n", 
            optimalBlockSize / 1024 / 1024, maxConcurrency);
    }
    
    public void uploadLargeFile(Path filePath, LargeFileCallback callback) throws IOException {
        long fileSize = Files.size(filePath);
        
        if (fileSize <= 256 * 1024 * 1024L) { // <= 256MB, use simple upload
            uploadSmallToMediumFile(filePath, callback);
        } else {
            uploadVeryLargeFile(filePath, callback);
        }
    }
    
    private void uploadSmallToMediumFile(Path filePath, LargeFileCallback callback) {
        ParallelTransferOptions options = new ParallelTransferOptions()
            .setBlockSizeLong(optimalBlockSize)
            .setMaxConcurrency(maxConcurrency)
            .setProgressListener(bytesTransferred -> {
                if (callback != null) {
                    try {
                        long totalSize = Files.size(filePath);
                        callback.onProgress(bytesTransferred, totalSize);
                    } catch (IOException ex) {
                        System.err.println("Progress callback error: " + ex.getMessage());
                    }
                }
            });
        
        BlobUploadFromFileOptions uploadOptions = new BlobUploadFromFileOptions(filePath.toString())
            .setParallelTransferOptions(options)
            .setHeaders(new BlobHttpHeaders()
                .setContentType(Files.probeContentType(filePath)))
            .setMetadata(Map.of(
                "file-name", filePath.getFileName().toString(),
                "file-size", String.valueOf(fileSize),
                "upload-method", "parallel-transfer"));
        
        try {
            Response<BlockBlobItem> response = blockBlobClient.uploadFromFileWithResponse(
                uploadOptions,
                Duration.ofHours(2),
                Context.NONE
            );
            
            if (callback != null) {
                callback.onComplete(response.getValue());
            }
            
        } catch (Exception ex) {
            if (callback != null) {
                callback.onError(ex);
            }
            throw ex;
        }
    }
    
    private void uploadVeryLargeFile(Path filePath, LargeFileCallback callback) throws IOException {
        long fileSize = Files.size(filePath);
        List<String> blockIds = new ArrayList<>();
        
        try (FileInputStream fileStream = new FileInputStream(filePath.toFile())) {
            long totalUploaded = 0;
            int blockIndex = 0;
            byte[] buffer = new byte[(int) optimalBlockSize];
            
            while (true) {
                int bytesRead = fileStream.read(buffer);
                if (bytesRead == -1) break; // End of file
                
                // Generate unique block ID
                String blockId = Base64.getEncoder().encodeToString(
                    String.format("block-%06d", blockIndex).getBytes());
                blockIds.add(blockId);
                
                // Upload this block
                byte[] blockData = bytesRead == buffer.length ? buffer : Arrays.copyOf(buffer, bytesRead);
                blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(blockData));
                
                totalUploaded += bytesRead;
                blockIndex++;
                
                // Progress callback
                if (callback != null) {
                    callback.onProgress(totalUploaded, fileSize);
                }
                
                // Optional: Commit blocks in batches for very large files
                if (blockIds.size() % 1000 == 0) {
                    System.out.println("Staged " + blockIds.size() + " blocks...");
                }
            }
            
            // Commit all blocks
            System.out.println("Committing " + blockIds.size() + " blocks...");
            BlockBlobItem result = blockBlobClient.commitBlockList(blockIds);
            
            if (callback != null) {
                callback.onComplete(result);
            }
            
        } catch (Exception ex) {
            if (callback != null) {
                callback.onError(ex);
            }
            throw ex;
        }
    }
    
    public interface LargeFileCallback {
        void onProgress(long uploaded, long total);
        void onComplete(BlockBlobItem result);
        void onError(Exception error);
    }
}

// Usage with comprehensive callback
LargeFileManager manager = new LargeFileManager(blockBlobClient);

manager.uploadLargeFile(Paths.get("very-large-file.zip"), new LargeFileManager.LargeFileCallback() {
    private long lastLoggedMB = 0;
    
    @Override
    public void onProgress(long uploaded, long total) {
        long uploadedMB = uploaded / 1024 / 1024;
        
        if (uploadedMB >= lastLoggedMB + 100) { // Log every 100MB
            double percentage = (double) uploaded / total * 100;
            System.out.printf("Large file progress: %.1f%% (%d MB / %d MB)%n", 
                percentage, uploadedMB, total / 1024 / 1024);
            lastLoggedMB = uploadedMB;
        }
    }
    
    @Override
    public void onComplete(BlockBlobItem result) {
        System.out.println("Large file upload completed successfully!");
        System.out.println("ETag: " + result.getETag());
        System.out.println("Last modified: " + result.getLastModified());
    }
    
    @Override
    public void onError(Exception error) {
        System.err.println("Large file upload failed: " + error.getMessage());
        error.printStackTrace();
    }
});

Resume Interrupted Uploads

public class ResumableUploadManager {
    private final BlockBlobClient blockBlobClient;
    private final Path checkpointFile;
    
    public ResumableUploadManager(BlockBlobClient blockBlobClient, String blobName) {
        this.blockBlobClient = blockBlobClient;
        this.checkpointFile = Paths.get(System.getProperty("java.io.tmpdir"), 
            blobName + ".upload.checkpoint");
    }
    
    public void resumableUpload(Path filePath, long blockSize) throws IOException {
        long fileSize = Files.size(filePath);
        UploadCheckpoint checkpoint = loadCheckpoint();
        
        if (checkpoint == null) {
            // Start new upload
            checkpoint = new UploadCheckpoint();
            checkpoint.filePath = filePath.toString();
            checkpoint.fileSize = fileSize;
            checkpoint.blockSize = blockSize;
            checkpoint.uploadedBlocks = new HashSet<>();
        }
        
        System.out.println("Starting resumable upload...");
        System.out.printf("File: %s (%d bytes)%n", filePath, fileSize);
        System.out.printf("Previously uploaded blocks: %d%n", checkpoint.uploadedBlocks.size());
        
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath.toFile(), "r")) {
            long totalBlocks = (fileSize + blockSize - 1) / blockSize;
            
            for (int blockIndex = 0; blockIndex < totalBlocks; blockIndex++) {
                String blockId = generateBlockId(blockIndex);
                
                // Skip if already uploaded
                if (checkpoint.uploadedBlocks.contains(blockId)) {
                    continue;
                }
                
                // Read block data
                long offset = (long) blockIndex * blockSize;
                long currentBlockSize = Math.min(blockSize, fileSize - offset);
                
                byte[] blockData = new byte[(int) currentBlockSize];
                randomAccessFile.seek(offset);
                randomAccessFile.readFully(blockData);
                
                // Upload block with retry
                uploadBlockWithRetry(blockId, blockData, 3);
                
                // Update checkpoint
                checkpoint.uploadedBlocks.add(blockId);
                saveCheckpoint(checkpoint);
                
                // Progress update
                double progress = (double) checkpoint.uploadedBlocks.size() / totalBlocks * 100;
                System.out.printf("Progress: %.1f%% (%d/%d blocks)%n", 
                    progress, checkpoint.uploadedBlocks.size(), totalBlocks);
            }
            
            // Commit all blocks
            List<String> allBlockIds = generateAllBlockIds(totalBlocks);
            BlockBlobItem result = blockBlobClient.commitBlockList(allBlockIds);
            
            // Clean up checkpoint
            Files.deleteIfExists(checkpointFile);
            
            System.out.println("Resumable upload completed successfully!");
            System.out.println("ETag: " + result.getETag());
            
        } catch (Exception ex) {
            System.err.println("Resumable upload failed: " + ex.getMessage());
            throw ex;
        }
    }
    
    private void uploadBlockWithRetry(String blockId, byte[] data, int maxRetries) {
        int attempt = 0;
        while (attempt < maxRetries) {
            try {
                blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(data));
                return; // Success
            } catch (Exception ex) {
                attempt++;
                if (attempt >= maxRetries) {
                    throw new RuntimeException("Failed to upload block after " + maxRetries + " attempts", ex);
                }
                
                System.err.printf("Block upload attempt %d failed, retrying: %s%n", attempt, ex.getMessage());
                
                try {
                    Thread.sleep(1000 * attempt); // Exponential backoff
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Upload interrupted", ie);
                }
            }
        }
    }
    
    private String generateBlockId(int blockIndex) {
        return Base64.getEncoder().encodeToString(
            String.format("block-%06d", blockIndex).getBytes());
    }
    
    private List<String> generateAllBlockIds(long totalBlocks) {
        List<String> blockIds = new ArrayList<>();
        for (int i = 0; i < totalBlocks; i++) {
            blockIds.add(generateBlockId(i));
        }
        return blockIds;
    }
    
    private UploadCheckpoint loadCheckpoint() {
        if (!Files.exists(checkpointFile)) {
            return null;
        }
        
        try {
            String json = Files.readString(checkpointFile);
            return new Gson().fromJson(json, UploadCheckpoint.class);
        } catch (Exception ex) {
            System.err.println("Failed to load checkpoint: " + ex.getMessage());
            return null;
        }
    }
    
    private void saveCheckpoint(UploadCheckpoint checkpoint) {
        try {
            String json = new Gson().toJson(checkpoint);
            Files.writeString(checkpointFile, json, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
        } catch (Exception ex) {
            System.err.println("Failed to save checkpoint: " + ex.getMessage());
        }
    }
    
    private static class UploadCheckpoint {
        String filePath;
        long fileSize;
        long blockSize;
        Set<String> uploadedBlocks = new HashSet<>();
    }
}

// Usage
ResumableUploadManager resumableManager = new ResumableUploadManager(blockBlobClient, "huge-file.dat");
resumableManager.resumableUpload(Paths.get("huge-file.dat"), 8 * 1024 * 1024L); // 8MB blocks

Memory-Efficient Streaming

Low-Memory Streaming Patterns

public class MemoryEfficientStreaming {
    
    // Process very large blobs without loading into memory
    public void processLargeBlobIncrementally(BlobClient blobClient, DataProcessor processor) {
        try (BlobInputStream inputStream = blobClient.openInputStream()) {
            processStreamInChunks(inputStream, processor, 64 * 1024); // 64KB chunks
        } catch (IOException ex) {
            throw new RuntimeException("Failed to process large blob", ex);
        }
    }
    
    private void processStreamInChunks(InputStream inputStream, DataProcessor processor, int chunkSize) 
            throws IOException {
        byte[] buffer = new byte[chunkSize];
        int bytesRead;
        long totalProcessed = 0;
        
        while ((bytesRead = inputStream.read(buffer)) != -1) {
            // Process only the bytes actually read
            if (bytesRead == buffer.length) {
                processor.processChunk(buffer);
            } else {
                // Last chunk might be smaller
                byte[] lastChunk = Arrays.copyOf(buffer, bytesRead);
                processor.processChunk(lastChunk);
            }
            
            totalProcessed += bytesRead;
            
            // Periodic garbage collection hint for long-running operations
            if (totalProcessed % (10 * 1024 * 1024) == 0) { // Every 10MB
                System.gc();
            }
        }
        
        processor.processingComplete(totalProcessed);
    }
    
    // Streaming upload from data source without buffering entire content
    public void streamUploadFromDataSource(BlockBlobClient blobClient, DataSource dataSource) {
        try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {
            byte[] buffer = new byte[8192]; // Small buffer
            
            while (dataSource.hasMoreData()) {
                int bytesGenerated = dataSource.generateData(buffer);
                if (bytesGenerated > 0) {
                    outputStream.write(buffer, 0, bytesGenerated);
                }
                
                // Flush periodically to prevent memory buildup
                if (dataSource.getBytesGenerated() % (1024 * 1024) == 0) { // Every 1MB
                    outputStream.flush();
                }
            }
            
        } catch (IOException ex) {
            throw new RuntimeException("Streaming upload failed", ex);
        }
    }
    
    // Pipeline processing: download -> transform -> upload without intermediate storage
    public void pipelineProcessing(BlobClient sourceBlobClient, BlockBlobClient targetBlobClient, 
                                 DataTransformer transformer) {
        try (BlobInputStream inputStream = sourceBlobClient.openInputStream();
             BlobOutputStream outputStream = targetBlobClient.getBlobOutputStream()) {
            
            byte[] inputBuffer = new byte[16384]; // 16KB input buffer
            int bytesRead;
            
            while ((bytesRead = inputStream.read(inputBuffer)) != -1) {
                // Transform data chunk
                byte[] transformedData = transformer.transform(inputBuffer, 0, bytesRead);
                
                // Write transformed data immediately
                outputStream.write(transformedData);
                
                // Clear buffers to help GC
                Arrays.fill(inputBuffer, (byte) 0);
            }
            
        } catch (IOException ex) {
            throw new RuntimeException("Pipeline processing failed", ex);
        }
    }
    
    // Interfaces for extensibility
    public interface DataProcessor {
        void processChunk(byte[] chunk);
        void processingComplete(long totalBytesProcessed);
    }
    
    public interface DataSource {
        boolean hasMoreData();
        int generateData(byte[] buffer);
        long getBytesGenerated();
    }
    
    public interface DataTransformer {
        byte[] transform(byte[] input, int offset, int length);
    }
}

// Example implementations
class CSVProcessor implements MemoryEfficientStreaming.DataProcessor {
    private final StringBuilder lineBuffer = new StringBuilder();
    private int recordCount = 0;
    
    @Override
    public void processChunk(byte[] chunk) {
        String chunkStr = new String(chunk, StandardCharsets.UTF_8);
        lineBuffer.append(chunkStr);
        
        // Process complete lines
        String[] lines = lineBuffer.toString().split("\n", -1);
        
        // Process all complete lines (all but the last)
        for (int i = 0; i < lines.length - 1; i++) {
            processCSVLine(lines[i]);
            recordCount++;
        }
        
        // Keep the incomplete line in buffer
        lineBuffer.setLength(0);
        lineBuffer.append(lines[lines.length - 1]);
    }
    
    @Override
    public void processingComplete(long totalBytesProcessed) {
        // Process any remaining line
        if (lineBuffer.length() > 0) {
            processCSVLine(lineBuffer.toString());
            recordCount++;
        }
        
        System.out.println("CSV processing complete:");
        System.out.println("Total records processed: " + recordCount);
        System.out.println("Total bytes processed: " + totalBytesProcessed);
    }
    
    private void processCSVLine(String line) {
        // Implement CSV line processing
        String[] fields = line.split(",");
        // Process fields...
    }
}

// Usage
MemoryEfficientStreaming streaming = new MemoryEfficientStreaming();

// Process large CSV blob
streaming.processLargeBlobIncrementally(largeCsvBlobClient, new CSVProcessor());

// Stream upload from generated data
streaming.streamUploadFromDataSource(targetBlobClient, new SyntheticDataSource());

Performance Monitoring and Optimization

Advanced Streaming Metrics

public class StreamingMetrics {
    private final String operationName;
    private final long startTime;
    private long bytesProcessed;
    private final List<Long> throughputSamples;
    private long lastSampleTime;
    private long lastSampleBytes;
    
    public StreamingMetrics(String operationName) {
        this.operationName = operationName;
        this.startTime = System.currentTimeMillis();
        this.throughputSamples = new ArrayList<>();
        this.lastSampleTime = startTime;
        this.lastSampleBytes = 0;
    }
    
    public synchronized void recordBytes(long bytes) {
        this.bytesProcessed += bytes;
        
        long currentTime = System.currentTimeMillis();
        
        // Sample throughput every second
        if (currentTime - lastSampleTime >= 1000) {
            long intervalBytes = bytesProcessed - lastSampleBytes;
            double intervalSeconds = (currentTime - lastSampleTime) / 1000.0;
            double throughputMBps = (intervalBytes / 1024.0 / 1024.0) / intervalSeconds;
            
            throughputSamples.add((long) (throughputMBps * 100)); // Store as centibytes for precision
            
            lastSampleTime = currentTime;
            lastSampleBytes = bytesProcessed;
        }
    }
    
    public StreamingReport generateReport() {
        long totalTime = System.currentTimeMillis() - startTime;
        double totalSeconds = totalTime / 1000.0;
        double averageThroughput = totalSeconds > 0 ? (bytesProcessed / 1024.0 / 1024.0) / totalSeconds : 0;
        
        double maxThroughput = throughputSamples.stream()
            .mapToDouble(sample -> sample / 100.0)
            .max()
            .orElse(0.0);
        
        double minThroughput = throughputSamples.stream()
            .mapToDouble(sample -> sample / 100.0)
            .min()
            .orElse(0.0);
        
        return new StreamingReport(
            operationName,
            bytesProcessed,
            totalTime,
            averageThroughput,
            maxThroughput,
            minThroughput,
            new ArrayList<>(throughputSamples)
        );
    }
    
    public static class StreamingReport {
        public final String operation;
        public final long totalBytes;
        public final long totalTimeMs;
        public final double averageThroughputMBps;
        public final double maxThroughputMBps;
        public final double minThroughputMBps;
        public final List<Long> throughputSamples;
        
        StreamingReport(String operation, long totalBytes, long totalTimeMs,
                       double averageThroughputMBps, double maxThroughputMBps, double minThroughputMBps,
                       List<Long> throughputSamples) {
            this.operation = operation;
            this.totalBytes = totalBytes;
            this.totalTimeMs = totalTimeMs;
            this.averageThroughputMBps = averageThroughputMBps;
            this.maxThroughputMBps = maxThroughputMBps;
            this.minThroughputMBps = minThroughputMBps;
            this.throughputSamples = throughputSamples;
        }
        
        public void printReport() {
            System.out.println("\n=== Streaming Performance Report ===");
            System.out.println("Operation: " + operation);
            System.out.printf("Total bytes: %s%n", formatBytes(totalBytes));
            System.out.printf("Total time: %.2f seconds%n", totalTimeMs / 1000.0);
            System.out.printf("Average throughput: %.2f MB/s%n", averageThroughputMBps);
            System.out.printf("Max throughput: %.2f MB/s%n", maxThroughputMBps);
            System.out.printf("Min throughput: %.2f MB/s%n", minThroughputMBps);
            System.out.printf("Throughput stability: %.2f%%%n", calculateStability());
            System.out.println("=====================================");
        }
        
        private double calculateStability() {
            if (maxThroughputMBps == 0) return 100.0;
            return (1.0 - (maxThroughputMBps - minThroughputMBps) / maxThroughputMBps) * 100.0;
        }
        
        private String formatBytes(long bytes) {
            if (bytes < 1024) return bytes + " B";
            if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
            if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / 1024.0 / 1024.0);
            return String.format("%.2f GB", bytes / 1024.0 / 1024.0 / 1024.0);
        }
    }
}

// Performance-monitored streaming
public class MonitoredStreamingOperations {
    
    public StreamingMetrics.StreamingReport monitoredStreamDownload(BlobClient blobClient, 
                                                                   OutputStream outputStream) {
        StreamingMetrics metrics = new StreamingMetrics("Blob Download");
        
        try (BlobInputStream inputStream = blobClient.openInputStream()) {
            byte[] buffer = new byte[64 * 1024]; // 64KB buffer
            int bytesRead;
            
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
                metrics.recordBytes(bytesRead);
            }
            
        } catch (IOException ex) {
            throw new RuntimeException("Monitored download failed", ex);
        }
        
        return metrics.generateReport();
    }
    
    public StreamingMetrics.StreamingReport monitoredStreamUpload(BlockBlobClient blobClient, 
                                                                 InputStream inputStream) {
        StreamingMetrics metrics = new StreamingMetrics("Blob Upload");
        
        try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {
            byte[] buffer = new byte[64 * 1024]; // 64KB buffer
            int bytesRead;
            
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
                metrics.recordBytes(bytesRead);
            }
            
        } catch (IOException ex) {
            throw new RuntimeException("Monitored upload failed", ex);
        }
        
        return metrics.generateReport();
    }
}

// Usage with performance monitoring
MonitoredStreamingOperations monitoredOps = new MonitoredStreamingOperations();

// Monitor download performance
try (FileOutputStream fileOutput = new FileOutputStream("downloaded-large-file.dat")) {
    StreamingMetrics.StreamingReport downloadReport = monitoredOps.monitoredStreamDownload(
        blobClient, fileOutput);
    downloadReport.printReport();
}

// Monitor upload performance
try (FileInputStream fileInput = new FileInputStream("upload-large-file.dat")) {
    StreamingMetrics.StreamingReport uploadReport = monitoredOps.monitoredStreamUpload(
        blockBlobClient, fileInput);
    uploadReport.printReport();
}

Reactive Streaming Patterns

Advanced Reactive Operations

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactiveStreamingPatterns {
    
    // Parallel processing of multiple blobs
    public Mono<List<ProcessingResult>> processMultipleBlobsConcurrently(
            List<BlobAsyncClient> blobClients, 
            int concurrency) {
        
        return Flux.fromIterable(blobClients)
            .flatMap(blobClient -> 
                processBlobReactively(blobClient)
                    .subscribeOn(Schedulers.boundedElastic()),
                concurrency)
            .collectList();
    }
    
    private Mono<ProcessingResult> processBlobReactively(BlobAsyncClient blobClient) {
        return blobClient.downloadStream()
            .reduce(0L, (total, buffer) -> total + buffer.remaining())
            .map(totalBytes -> new ProcessingResult(blobClient.getBlobName(), totalBytes))
            .doOnSuccess(result -> 
                System.out.println("Processed " + result.blobName + ": " + result.totalBytes + " bytes"))
            .onErrorResume(error -> {
                System.err.println("Failed to process " + blobClient.getBlobName() + ": " + error.getMessage());
                return Mono.just(new ProcessingResult(blobClient.getBlobName(), -1));
            });
    }
    
    // Stream transformation pipeline
    public Mono<Void> transformAndUpload(BlobAsyncClient sourceClient, 
                                       BlobAsyncClient targetClient,
                                       Function<ByteBuffer, ByteBuffer> transformer) {
        
        Flux<ByteBuffer> transformedStream = sourceClient.downloadStream()
            .map(transformer)
            .onErrorResume(error -> {
                System.err.println("Transformation error: " + error.getMessage());
                return Flux.empty();
            });
        
        BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(transformedStream)
            .setParallelTransferOptions(new ParallelTransferOptions()
                .setBlockSizeLong(1024 * 1024L)
                .setMaxConcurrency(4));
        
        return targetClient.uploadWithResponse(uploadOptions)
            .doOnSuccess(response -> 
                System.out.println("Transform and upload completed: " + response.getStatusCode()))
            .then();
    }
    
    // Batch processing with backpressure
    public Flux<BatchResult> batchProcessWithBackpressure(Flux<BlobAsyncClient> blobClients,
                                                         int batchSize,
                                                         Duration batchTimeout) {
        
        return blobClients
            .buffer(batchSize, batchTimeout)
            .flatMap(batch -> processBatch(batch), 2) // Max 2 concurrent batches
            .onBackpressureBuffer(100); // Buffer up to 100 results
    }
    
    private Mono<BatchResult> processBatch(List<BlobAsyncClient> batch) {
        return Flux.fromIterable(batch)
            .flatMap(blobClient -> 
                blobClient.getProperties()
                    .map(props -> props.getBlobSize())
                    .onErrorReturn(0L))
            .reduce(Long::sum)
            .map(totalSize -> new BatchResult(batch.size(), totalSize))
            .doOnSuccess(result -> 
                System.out.println("Batch processed: " + result.blobCount + 
                    " blobs, " + result.totalSize + " bytes"));
    }
    
    // Streaming aggregation
    public Mono<AggregationResult> streamingAggregation(List<BlobAsyncClient> blobClients) {
        return Flux.fromIterable(blobClients)
            .flatMap(blobClient ->
                blobClient.downloadStream()
                    .map(ByteBuffer::remaining)
                    .cast(Long.class)
                    .reduce(Long::sum)
                    .map(size -> new BlobStats(blobClient.getBlobName(), size)))
            .reduce(new AggregationResult(), (agg, stats) -> {
                agg.totalBlobs++;
                agg.totalBytes += stats.size;
                agg.maxBlobSize = Math.max(agg.maxBlobSize, stats.size);
                agg.minBlobSize = agg.minBlobSize == 0 ? stats.size : Math.min(agg.minBlobSize, stats.size);
                return agg;
            })
            .doOnSuccess(result -> {
                result.avgBlobSize = result.totalBlobs > 0 ? result.totalBytes / result.totalBlobs : 0;
                System.out.println("Aggregation complete: " + result);
            });
    }
    
    // Data classes for results
    public static class ProcessingResult {
        public final String blobName;
        public final long totalBytes;
        
        ProcessingResult(String blobName, long totalBytes) {
            this.blobName = blobName;
            this.totalBytes = totalBytes;
        }
    }
    
    public static class BatchResult {
        public final int blobCount;
        public final long totalSize;
        
        BatchResult(int blobCount, long totalSize) {
            this.blobCount = blobCount;
            this.totalSize = totalSize;
        }
    }
    
    public static class BlobStats {
        public final String name;
        public final long size;
        
        BlobStats(String name, long size) {
            this.name = name;
            this.size = size;
        }
    }
    
    public static class AggregationResult {
        public int totalBlobs = 0;
        public long totalBytes = 0;
        public long maxBlobSize = 0;
        public long minBlobSize = 0;
        public long avgBlobSize = 0;
        
        @Override
        public String toString() {
            return String.format("AggregationResult{blobs=%d, totalBytes=%d, avgSize=%d, minSize=%d, maxSize=%d}", 
                totalBlobs, totalBytes, avgBlobSize, minBlobSize, maxBlobSize);
        }
    }
}

// Usage examples
ReactiveStreamingPatterns patterns = new ReactiveStreamingPatterns();

// Process multiple blobs concurrently
List<BlobAsyncClient> asyncClients = Arrays.asList(
    blobClient1.getAsyncClient(),
    blobClient2.getAsyncClient(),
    blobClient3.getAsyncClient()
);

patterns.processMultipleBlobsConcurrently(asyncClients, 3)
    .doOnSuccess(results -> 
        System.out.println("Processed " + results.size() + " blobs concurrently"))
    .subscribe();

// Transform and upload pipeline
patterns.transformAndUpload(
    sourceBlobClient.getAsyncClient(),
    targetBlobClient.getAsyncClient(),
    buffer -> {
        // Example transformation: convert to uppercase text
        String text = StandardCharsets.UTF_8.decode(buffer).toString().toUpperCase();
        return ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8));
    }
).subscribe();

Related Documentation

  • ← Back to Overview
  • ← Security & Authentication
  • Specialized Blob Types →
  • Configuration Options →

Install with Tessl CLI

npx tessl i tessl/maven-com-azure--azure-storage-blob

docs

blob-client.md

container-client.md

index.md

models.md

options.md

security.md

service-client.md

specialized-clients.md

streaming.md

tile.json