or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

blob-client.mdcontainer-client.mdindex.mdmodels.mdoptions.mdsecurity.mdservice-client.mdspecialized-clients.mdstreaming.md

streaming.mddocs/

0

# Streaming & Advanced I/O

1

2

This documentation covers advanced I/O operations in the Azure Storage Blob Java SDK, including streaming uploads and downloads, reactive programming patterns, and high-performance data transfer techniques.

3

4

## Input Streams and Downloads

5

6

### BlobInputStream

7

8

Stream blob content directly without loading entire content into memory.

9

10

```java

11

import com.azure.storage.blob.specialized.BlobInputStream;

12

import java.io.InputStream;

13

import java.io.BufferedReader;

14

import java.io.InputStreamReader;

15

16

// Open blob as input stream

17

try (BlobInputStream blobInputStream = blobClient.openInputStream()) {

18

// Process stream incrementally

19

byte[] buffer = new byte[8192];

20

int bytesRead;

21

long totalBytesRead = 0;

22

23

while ((bytesRead = blobInputStream.read(buffer)) != -1) {

24

// Process buffer content

25

processChunk(buffer, bytesRead);

26

totalBytesRead += bytesRead;

27

28

if (totalBytesRead % (1024 * 1024) == 0) { // Log every MB

29

System.out.println("Processed: " + (totalBytesRead / 1024 / 1024) + " MB");

30

}

31

}

32

33

System.out.println("Total bytes processed: " + totalBytesRead);

34

35

} catch (IOException ex) {

36

System.err.println("Stream processing failed: " + ex.getMessage());

37

}

38

39

// Open blob input stream with options

40

BlobInputStreamOptions inputStreamOptions = new BlobInputStreamOptions()

41

.setBlockSize(1024 * 1024) // 1MB read buffer

42

.setRange(new BlobRange(0, 10 * 1024 * 1024L)) // Read first 10MB only

43

.setRequestConditions(new BlobRequestConditions()

44

.setIfMatch(blobClient.getProperties().getETag()));

45

46

try (BlobInputStream configuredStream = blobClient.openInputStream(inputStreamOptions)) {

47

// Process configured stream

48

processConfiguredStream(configuredStream);

49

}

50

51

// Stream text content line by line

52

try (BlobInputStream blobStream = blobClient.openInputStream();

53

BufferedReader reader = new BufferedReader(new InputStreamReader(blobStream, StandardCharsets.UTF_8))) {

54

55

String line;

56

int lineNumber = 0;

57

58

while ((line = reader.readLine()) != null) {

59

lineNumber++;

60

processTextLine(line, lineNumber);

61

62

if (lineNumber % 1000 == 0) { // Log progress every 1000 lines

63

System.out.println("Processed " + lineNumber + " lines");

64

}

65

}

66

67

System.out.println("Total lines processed: " + lineNumber);

68

}

69

```

70

71

### Streaming Downloads with Reactive Programming

72

73

```java

74

import reactor.core.publisher.Flux;

75

import reactor.core.publisher.Mono;

76

import java.nio.ByteBuffer;

77

78

// Async streaming download

79

BlobAsyncClient asyncClient = blobClient.getAsyncClient();

80

81

Flux<ByteBuffer> downloadStream = asyncClient.downloadStream();

82

83

// Process stream reactively

84

downloadStream

85

.doOnNext(buffer -> {

86

int bufferSize = buffer.remaining();

87

System.out.println("Received buffer: " + bufferSize + " bytes");

88

89

// Process buffer content

90

byte[] bytes = new byte[bufferSize];

91

buffer.get(bytes);

92

processAsyncChunk(bytes);

93

})

94

.doOnError(error -> System.err.println("Download stream error: " + error.getMessage()))

95

.doOnComplete(() -> System.out.println("Download stream completed"))

96

.subscribe();

97

98

// Download with backpressure control

99

downloadStream

100

.buffer(10) // Buffer 10 chunks

101

.flatMap(buffers -> {

102

// Process batch of buffers

103

return Mono.fromRunnable(() -> processBatch(buffers));

104

}, 2) // Process max 2 batches concurrently

105

.doOnNext(result -> System.out.println("Batch processed"))

106

.subscribe();

107

108

// Download with retry and error handling

109

downloadStream

110

.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))

111

.onErrorResume(error -> {

112

System.err.println("Download failed after retries: " + error.getMessage());

113

return Flux.empty();

114

})

115

.subscribe();

116

```

117

118

### Range-Based Streaming

119

120

```java

121

// Stream specific ranges of large blobs

122

public class RangeBasedStreamer {

123

private final BlobClient blobClient;

124

private final long chunkSize;

125

126

public RangeBasedStreamer(BlobClient blobClient, long chunkSize) {

127

this.blobClient = blobClient;

128

this.chunkSize = chunkSize;

129

}

130

131

public void streamBlobInRanges(ProgressCallback callback) {

132

try {

133

BlobProperties properties = blobClient.getProperties();

134

long blobSize = properties.getBlobSize();

135

long totalProcessed = 0;

136

137

System.out.println("Streaming blob in " + chunkSize + " byte chunks");

138

System.out.println("Total blob size: " + blobSize + " bytes");

139

140

for (long offset = 0; offset < blobSize; offset += chunkSize) {

141

long rangeEnd = Math.min(offset + chunkSize - 1, blobSize - 1);

142

BlobRange range = new BlobRange(offset, rangeEnd - offset + 1);

143

144

// Download this range

145

BlobDownloadContentResponse rangeResponse = blobClient.downloadContentWithResponse(

146

new BlobDownloadOptions().setRange(range),

147

Duration.ofMinutes(2),

148

Context.NONE

149

);

150

151

BinaryData rangeData = rangeResponse.getValue();

152

byte[] chunkBytes = rangeData.toBytes();

153

154

// Process chunk

155

processChunkWithRange(chunkBytes, offset, rangeEnd);

156

157

totalProcessed += chunkBytes.length;

158

159

// Report progress

160

if (callback != null) {

161

double progress = (double) totalProcessed / blobSize * 100;

162

callback.onProgress(totalProcessed, blobSize, progress);

163

}

164

}

165

166

System.out.println("Range-based streaming completed");

167

168

} catch (Exception ex) {

169

System.err.println("Range streaming failed: " + ex.getMessage());

170

throw ex;

171

}

172

}

173

174

@FunctionalInterface

175

public interface ProgressCallback {

176

void onProgress(long processed, long total, double percentage);

177

}

178

179

private void processChunkWithRange(byte[] chunk, long startOffset, long endOffset) {

180

System.out.printf("Processing chunk: offset %d-%d (%d bytes)%n",

181

startOffset, endOffset, chunk.length);

182

// Implement chunk processing logic

183

}

184

}

185

186

// Usage

187

RangeBasedStreamer streamer = new RangeBasedStreamer(blobClient, 5 * 1024 * 1024L); // 5MB chunks

188

189

streamer.streamBlobInRanges((processed, total, percentage) -> {

190

System.out.printf("Progress: %.1f%% (%d/%d bytes)%n", percentage, processed, total);

191

});

192

```

193

194

## Output Streams and Uploads

195

196

### BlobOutputStream

197

198

Stream data directly to blobs without buffering entire content.

199

200

```java

201

import com.azure.storage.blob.specialized.BlobOutputStream;

202

203

// Basic output stream usage

204

try (BlobOutputStream outputStream = blockBlobClient.getBlobOutputStream()) {

205

// Write data incrementally

206

for (int i = 0; i < 1000; i++) {

207

String data = "Data chunk " + i + "\n";

208

outputStream.write(data.getBytes(StandardCharsets.UTF_8));

209

210

if (i % 100 == 0) {

211

outputStream.flush(); // Force upload of current blocks

212

System.out.println("Flushed at chunk " + i);

213

}

214

}

215

216

// Stream automatically closes and commits all blocks

217

System.out.println("Stream upload completed");

218

219

} catch (IOException ex) {

220

System.err.println("Stream upload failed: " + ex.getMessage());

221

}

222

223

// Configured output stream with options

224

ParallelTransferOptions streamTransferOptions = new ParallelTransferOptions()

225

.setBlockSizeLong(2 * 1024 * 1024L) // 2MB blocks

226

.setMaxConcurrency(4)

227

.setProgressListener(bytesTransferred ->

228

System.out.println("Stream progress: " + bytesTransferred + " bytes"));

229

230

BlobHttpHeaders streamHeaders = new BlobHttpHeaders()

231

.setContentType("application/octet-stream")

232

.setContentEncoding("gzip")

233

.setCacheControl("private, no-cache");

234

235

Map<String, String> streamMetadata = Map.of(

236

"streaming", "true",

237

"created-time", OffsetDateTime.now().toString(),

238

"source", "streaming-application"

239

);

240

241

try (BlobOutputStream configuredStream = blockBlobClient.getBlobOutputStream(

242

streamTransferOptions,

243

streamHeaders,

244

streamMetadata,

245

AccessTier.HOT,

246

new BlobRequestConditions().setIfNoneMatch("*"),

247

Context.NONE)) {

248

249

// Write large dataset incrementally

250

writeDataSetToStream(configuredStream);

251

252

System.out.println("Configured stream upload completed");

253

}

254

```

255

256

### Reactive Streaming Uploads

257

258

```java

259

// Upload from Flux of ByteBuffer

260

Flux<ByteBuffer> dataFlux = createDataFlux();

261

262

BlobParallelUploadOptions fluxUploadOptions = new BlobParallelUploadOptions(dataFlux)

263

.setParallelTransferOptions(new ParallelTransferOptions()

264

.setBlockSizeLong(1024 * 1024L) // 1MB blocks

265

.setMaxConcurrency(6)

266

.setProgressListener(bytesTransferred ->

267

System.out.println("Reactive upload progress: " + bytesTransferred)))

268

.setHeaders(new BlobHttpHeaders().setContentType("application/octet-stream"))

269

.setMetadata(Map.of("source", "reactive-stream"));

270

271

BlobAsyncClient asyncClient = blobClient.getAsyncClient();

272

273

// Upload reactively

274

Mono<Response<BlockBlobItem>> uploadMono = asyncClient.uploadWithResponse(fluxUploadOptions);

275

276

uploadMono

277

.doOnSuccess(response -> {

278

System.out.println("Reactive upload completed:");

279

System.out.println("ETag: " + response.getValue().getETag());

280

System.out.println("Status: " + response.getStatusCode());

281

})

282

.doOnError(error -> System.err.println("Reactive upload failed: " + error.getMessage()))

283

.subscribe();

284

285

// Create data flux from various sources

286

private Flux<ByteBuffer> createDataFlux() {

287

return Flux.range(1, 1000)

288

.map(i -> {

289

String data = "Reactive data chunk " + i + "\n";

290

return ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8));

291

})

292

.delayElements(Duration.ofMillis(10)); // Simulate streaming data

293

}

294

295

// Upload from file with reactive progress

296

Path filePath = Paths.get("large-file.dat");

297

Flux<ByteBuffer> fileFlux = DataBufferUtils.read(

298

filePath,

299

new DefaultDataBufferFactory(),

300

4096 // 4KB buffers

301

).map(DataBuffer::asByteBuffer);

302

303

BlobParallelUploadOptions fileFluxOptions = new BlobParallelUploadOptions(fileFlux)

304

.setParallelTransferOptions(new ParallelTransferOptions()

305

.setProgressListener(new ReactiveProgressTracker(Files.size(filePath))));

306

307

asyncClient.uploadWithResponse(fileFluxOptions)

308

.doOnSuccess(response -> System.out.println("File stream upload completed"))

309

.subscribe();

310

```

311

312

## Large File Handling

313

314

### Optimized Large File Operations

315

316

```java

317

public class LargeFileManager {

318

private final BlockBlobClient blockBlobClient;

319

private final long optimalBlockSize;

320

private final int maxConcurrency;

321

322

public LargeFileManager(BlockBlobClient blockBlobClient) {

323

this.blockBlobClient = blockBlobClient;

324

325

// Optimize based on file size and available memory

326

long availableMemory = Runtime.getRuntime().maxMemory();

327

this.optimalBlockSize = Math.min(32 * 1024 * 1024L, availableMemory / 16); // Max 32MB or 1/16 of heap

328

this.maxConcurrency = Math.min(16, Runtime.getRuntime().availableProcessors() * 2);

329

330

System.out.printf("Optimized for block size: %d MB, concurrency: %d%n",

331

optimalBlockSize / 1024 / 1024, maxConcurrency);

332

}

333

334

public void uploadLargeFile(Path filePath, LargeFileCallback callback) throws IOException {

335

long fileSize = Files.size(filePath);

336

337

if (fileSize <= 256 * 1024 * 1024L) { // <= 256MB, use simple upload

338

uploadSmallToMediumFile(filePath, callback);

339

} else {

340

uploadVeryLargeFile(filePath, callback);

341

}

342

}

343

344

private void uploadSmallToMediumFile(Path filePath, LargeFileCallback callback) {

345

ParallelTransferOptions options = new ParallelTransferOptions()

346

.setBlockSizeLong(optimalBlockSize)

347

.setMaxConcurrency(maxConcurrency)

348

.setProgressListener(bytesTransferred -> {

349

if (callback != null) {

350

try {

351

long totalSize = Files.size(filePath);

352

callback.onProgress(bytesTransferred, totalSize);

353

} catch (IOException ex) {

354

System.err.println("Progress callback error: " + ex.getMessage());

355

}

356

}

357

});

358

359

BlobUploadFromFileOptions uploadOptions = new BlobUploadFromFileOptions(filePath.toString())

360

.setParallelTransferOptions(options)

361

.setHeaders(new BlobHttpHeaders()

362

.setContentType(Files.probeContentType(filePath)))

363

.setMetadata(Map.of(

364

"file-name", filePath.getFileName().toString(),

365

"file-size", String.valueOf(fileSize),

366

"upload-method", "parallel-transfer"));

367

368

try {

369

Response<BlockBlobItem> response = blockBlobClient.uploadFromFileWithResponse(

370

uploadOptions,

371

Duration.ofHours(2),

372

Context.NONE

373

);

374

375

if (callback != null) {

376

callback.onComplete(response.getValue());

377

}

378

379

} catch (Exception ex) {

380

if (callback != null) {

381

callback.onError(ex);

382

}

383

throw ex;

384

}

385

}

386

387

private void uploadVeryLargeFile(Path filePath, LargeFileCallback callback) throws IOException {

388

long fileSize = Files.size(filePath);

389

List<String> blockIds = new ArrayList<>();

390

391

try (FileInputStream fileStream = new FileInputStream(filePath.toFile())) {

392

long totalUploaded = 0;

393

int blockIndex = 0;

394

byte[] buffer = new byte[(int) optimalBlockSize];

395

396

while (true) {

397

int bytesRead = fileStream.read(buffer);

398

if (bytesRead == -1) break; // End of file

399

400

// Generate unique block ID

401

String blockId = Base64.getEncoder().encodeToString(

402

String.format("block-%06d", blockIndex).getBytes());

403

blockIds.add(blockId);

404

405

// Upload this block

406

byte[] blockData = bytesRead == buffer.length ? buffer : Arrays.copyOf(buffer, bytesRead);

407

blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(blockData));

408

409

totalUploaded += bytesRead;

410

blockIndex++;

411

412

// Progress callback

413

if (callback != null) {

414

callback.onProgress(totalUploaded, fileSize);

415

}

416

417

// Optional: Commit blocks in batches for very large files

418

if (blockIds.size() % 1000 == 0) {

419

System.out.println("Staged " + blockIds.size() + " blocks...");

420

}

421

}

422

423

// Commit all blocks

424

System.out.println("Committing " + blockIds.size() + " blocks...");

425

BlockBlobItem result = blockBlobClient.commitBlockList(blockIds);

426

427

if (callback != null) {

428

callback.onComplete(result);

429

}

430

431

} catch (Exception ex) {

432

if (callback != null) {

433

callback.onError(ex);

434

}

435

throw ex;

436

}

437

}

438

439

public interface LargeFileCallback {

440

void onProgress(long uploaded, long total);

441

void onComplete(BlockBlobItem result);

442

void onError(Exception error);

443

}

444

}

445

446

// Usage with comprehensive callback

447

LargeFileManager manager = new LargeFileManager(blockBlobClient);

448

449

manager.uploadLargeFile(Paths.get("very-large-file.zip"), new LargeFileManager.LargeFileCallback() {

450

private long lastLoggedMB = 0;

451

452

@Override

453

public void onProgress(long uploaded, long total) {

454

long uploadedMB = uploaded / 1024 / 1024;

455

456

if (uploadedMB >= lastLoggedMB + 100) { // Log every 100MB

457

double percentage = (double) uploaded / total * 100;

458

System.out.printf("Large file progress: %.1f%% (%d MB / %d MB)%n",

459

percentage, uploadedMB, total / 1024 / 1024);

460

lastLoggedMB = uploadedMB;

461

}

462

}

463

464

@Override

465

public void onComplete(BlockBlobItem result) {

466

System.out.println("Large file upload completed successfully!");

467

System.out.println("ETag: " + result.getETag());

468

System.out.println("Last modified: " + result.getLastModified());

469

}

470

471

@Override

472

public void onError(Exception error) {

473

System.err.println("Large file upload failed: " + error.getMessage());

474

error.printStackTrace();

475

}

476

});

477

```

478

479

### Resume Interrupted Uploads

480

481

```java

482

public class ResumableUploadManager {

483

private final BlockBlobClient blockBlobClient;

484

private final Path checkpointFile;

485

486

public ResumableUploadManager(BlockBlobClient blockBlobClient, String blobName) {

487

this.blockBlobClient = blockBlobClient;

488

this.checkpointFile = Paths.get(System.getProperty("java.io.tmpdir"),

489

blobName + ".upload.checkpoint");

490

}

491

492

public void resumableUpload(Path filePath, long blockSize) throws IOException {

493

long fileSize = Files.size(filePath);

494

UploadCheckpoint checkpoint = loadCheckpoint();

495

496

if (checkpoint == null) {

497

// Start new upload

498

checkpoint = new UploadCheckpoint();

499

checkpoint.filePath = filePath.toString();

500

checkpoint.fileSize = fileSize;

501

checkpoint.blockSize = blockSize;

502

checkpoint.uploadedBlocks = new HashSet<>();

503

}

504

505

System.out.println("Starting resumable upload...");

506

System.out.printf("File: %s (%d bytes)%n", filePath, fileSize);

507

System.out.printf("Previously uploaded blocks: %d%n", checkpoint.uploadedBlocks.size());

508

509

try (RandomAccessFile randomAccessFile = new RandomAccessFile(filePath.toFile(), "r")) {

510

long totalBlocks = (fileSize + blockSize - 1) / blockSize;

511

512

for (int blockIndex = 0; blockIndex < totalBlocks; blockIndex++) {

513

String blockId = generateBlockId(blockIndex);

514

515

// Skip if already uploaded

516

if (checkpoint.uploadedBlocks.contains(blockId)) {

517

continue;

518

}

519

520

// Read block data

521

long offset = (long) blockIndex * blockSize;

522

long currentBlockSize = Math.min(blockSize, fileSize - offset);

523

524

byte[] blockData = new byte[(int) currentBlockSize];

525

randomAccessFile.seek(offset);

526

randomAccessFile.readFully(blockData);

527

528

// Upload block with retry

529

uploadBlockWithRetry(blockId, blockData, 3);

530

531

// Update checkpoint

532

checkpoint.uploadedBlocks.add(blockId);

533

saveCheckpoint(checkpoint);

534

535

// Progress update

536

double progress = (double) checkpoint.uploadedBlocks.size() / totalBlocks * 100;

537

System.out.printf("Progress: %.1f%% (%d/%d blocks)%n",

538

progress, checkpoint.uploadedBlocks.size(), totalBlocks);

539

}

540

541

// Commit all blocks

542

List<String> allBlockIds = generateAllBlockIds(totalBlocks);

543

BlockBlobItem result = blockBlobClient.commitBlockList(allBlockIds);

544

545

// Clean up checkpoint

546

Files.deleteIfExists(checkpointFile);

547

548

System.out.println("Resumable upload completed successfully!");

549

System.out.println("ETag: " + result.getETag());

550

551

} catch (Exception ex) {

552

System.err.println("Resumable upload failed: " + ex.getMessage());

553

throw ex;

554

}

555

}

556

557

private void uploadBlockWithRetry(String blockId, byte[] data, int maxRetries) {

558

int attempt = 0;

559

while (attempt < maxRetries) {

560

try {

561

blockBlobClient.stageBlock(blockId, BinaryData.fromBytes(data));

562

return; // Success

563

} catch (Exception ex) {

564

attempt++;

565

if (attempt >= maxRetries) {

566

throw new RuntimeException("Failed to upload block after " + maxRetries + " attempts", ex);

567

}

568

569

System.err.printf("Block upload attempt %d failed, retrying: %s%n", attempt, ex.getMessage());

570

571

try {

572

Thread.sleep(1000 * attempt); // Exponential backoff

573

} catch (InterruptedException ie) {

574

Thread.currentThread().interrupt();

575

throw new RuntimeException("Upload interrupted", ie);

576

}

577

}

578

}

579

}

580

581

private String generateBlockId(int blockIndex) {

582

return Base64.getEncoder().encodeToString(

583

String.format("block-%06d", blockIndex).getBytes());

584

}

585

586

private List<String> generateAllBlockIds(long totalBlocks) {

587

List<String> blockIds = new ArrayList<>();

588

for (int i = 0; i < totalBlocks; i++) {

589

blockIds.add(generateBlockId(i));

590

}

591

return blockIds;

592

}

593

594

private UploadCheckpoint loadCheckpoint() {

595

if (!Files.exists(checkpointFile)) {

596

return null;

597

}

598

599

try {

600

String json = Files.readString(checkpointFile);

601

return new Gson().fromJson(json, UploadCheckpoint.class);

602

} catch (Exception ex) {

603

System.err.println("Failed to load checkpoint: " + ex.getMessage());

604

return null;

605

}

606

}

607

608

private void saveCheckpoint(UploadCheckpoint checkpoint) {

609

try {

610

String json = new Gson().toJson(checkpoint);

611

Files.writeString(checkpointFile, json, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);

612

} catch (Exception ex) {

613

System.err.println("Failed to save checkpoint: " + ex.getMessage());

614

}

615

}

616

617

private static class UploadCheckpoint {

618

String filePath;

619

long fileSize;

620

long blockSize;

621

Set<String> uploadedBlocks = new HashSet<>();

622

}

623

}

624

625

// Usage

626

ResumableUploadManager resumableManager = new ResumableUploadManager(blockBlobClient, "huge-file.dat");

627

resumableManager.resumableUpload(Paths.get("huge-file.dat"), 8 * 1024 * 1024L); // 8MB blocks

628

```

629

630

## Memory-Efficient Streaming

631

632

### Low-Memory Streaming Patterns

633

634

```java

635

public class MemoryEfficientStreaming {

636

637

// Process very large blobs without loading into memory

638

public void processLargeBlobIncrementally(BlobClient blobClient, DataProcessor processor) {

639

try (BlobInputStream inputStream = blobClient.openInputStream()) {

640

processStreamInChunks(inputStream, processor, 64 * 1024); // 64KB chunks

641

} catch (IOException ex) {

642

throw new RuntimeException("Failed to process large blob", ex);

643

}

644

}

645

646

private void processStreamInChunks(InputStream inputStream, DataProcessor processor, int chunkSize)

647

throws IOException {

648

byte[] buffer = new byte[chunkSize];

649

int bytesRead;

650

long totalProcessed = 0;

651

652

while ((bytesRead = inputStream.read(buffer)) != -1) {

653

// Process only the bytes actually read

654

if (bytesRead == buffer.length) {

655

processor.processChunk(buffer);

656

} else {

657

// Last chunk might be smaller

658

byte[] lastChunk = Arrays.copyOf(buffer, bytesRead);

659

processor.processChunk(lastChunk);

660

}

661

662

totalProcessed += bytesRead;

663

664

// Periodic garbage collection hint for long-running operations

665

if (totalProcessed % (10 * 1024 * 1024) == 0) { // Every 10MB

666

System.gc();

667

}

668

}

669

670

processor.processingComplete(totalProcessed);

671

}

672

673

// Streaming upload from data source without buffering entire content

674

public void streamUploadFromDataSource(BlockBlobClient blobClient, DataSource dataSource) {

675

try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {

676

byte[] buffer = new byte[8192]; // Small buffer

677

678

while (dataSource.hasMoreData()) {

679

int bytesGenerated = dataSource.generateData(buffer);

680

if (bytesGenerated > 0) {

681

outputStream.write(buffer, 0, bytesGenerated);

682

}

683

684

// Flush periodically to prevent memory buildup

685

if (dataSource.getBytesGenerated() % (1024 * 1024) == 0) { // Every 1MB

686

outputStream.flush();

687

}

688

}

689

690

} catch (IOException ex) {

691

throw new RuntimeException("Streaming upload failed", ex);

692

}

693

}

694

695

// Pipeline processing: download -> transform -> upload without intermediate storage

696

public void pipelineProcessing(BlobClient sourceBlobClient, BlockBlobClient targetBlobClient,

697

DataTransformer transformer) {

698

try (BlobInputStream inputStream = sourceBlobClient.openInputStream();

699

BlobOutputStream outputStream = targetBlobClient.getBlobOutputStream()) {

700

701

byte[] inputBuffer = new byte[16384]; // 16KB input buffer

702

int bytesRead;

703

704

while ((bytesRead = inputStream.read(inputBuffer)) != -1) {

705

// Transform data chunk

706

byte[] transformedData = transformer.transform(inputBuffer, 0, bytesRead);

707

708

// Write transformed data immediately

709

outputStream.write(transformedData);

710

711

// Clear buffers to help GC

712

Arrays.fill(inputBuffer, (byte) 0);

713

}

714

715

} catch (IOException ex) {

716

throw new RuntimeException("Pipeline processing failed", ex);

717

}

718

}

719

720

// Interfaces for extensibility

721

public interface DataProcessor {

722

void processChunk(byte[] chunk);

723

void processingComplete(long totalBytesProcessed);

724

}

725

726

public interface DataSource {

727

boolean hasMoreData();

728

int generateData(byte[] buffer);

729

long getBytesGenerated();

730

}

731

732

public interface DataTransformer {

733

byte[] transform(byte[] input, int offset, int length);

734

}

735

}

736

737

// Example implementations

738

class CSVProcessor implements MemoryEfficientStreaming.DataProcessor {

739

private final StringBuilder lineBuffer = new StringBuilder();

740

private int recordCount = 0;

741

742

@Override

743

public void processChunk(byte[] chunk) {

744

String chunkStr = new String(chunk, StandardCharsets.UTF_8);

745

lineBuffer.append(chunkStr);

746

747

// Process complete lines

748

String[] lines = lineBuffer.toString().split("\n", -1);

749

750

// Process all complete lines (all but the last)

751

for (int i = 0; i < lines.length - 1; i++) {

752

processCSVLine(lines[i]);

753

recordCount++;

754

}

755

756

// Keep the incomplete line in buffer

757

lineBuffer.setLength(0);

758

lineBuffer.append(lines[lines.length - 1]);

759

}

760

761

@Override

762

public void processingComplete(long totalBytesProcessed) {

763

// Process any remaining line

764

if (lineBuffer.length() > 0) {

765

processCSVLine(lineBuffer.toString());

766

recordCount++;

767

}

768

769

System.out.println("CSV processing complete:");

770

System.out.println("Total records processed: " + recordCount);

771

System.out.println("Total bytes processed: " + totalBytesProcessed);

772

}

773

774

private void processCSVLine(String line) {

775

// Implement CSV line processing

776

String[] fields = line.split(",");

777

// Process fields...

778

}

779

}

780

781

// Usage

782

MemoryEfficientStreaming streaming = new MemoryEfficientStreaming();

783

784

// Process large CSV blob

785

streaming.processLargeBlobIncrementally(largeCsvBlobClient, new CSVProcessor());

786

787

// Stream upload from generated data

788

streaming.streamUploadFromDataSource(targetBlobClient, new SyntheticDataSource());

789

```

790

791

## Performance Monitoring and Optimization

792

793

### Advanced Streaming Metrics

794

795

```java

796

public class StreamingMetrics {

797

private final String operationName;

798

private final long startTime;

799

private long bytesProcessed;

800

private final List<Long> throughputSamples;

801

private long lastSampleTime;

802

private long lastSampleBytes;

803

804

public StreamingMetrics(String operationName) {

805

this.operationName = operationName;

806

this.startTime = System.currentTimeMillis();

807

this.throughputSamples = new ArrayList<>();

808

this.lastSampleTime = startTime;

809

this.lastSampleBytes = 0;

810

}

811

812

public synchronized void recordBytes(long bytes) {

813

this.bytesProcessed += bytes;

814

815

long currentTime = System.currentTimeMillis();

816

817

// Sample throughput every second

818

if (currentTime - lastSampleTime >= 1000) {

819

long intervalBytes = bytesProcessed - lastSampleBytes;

820

double intervalSeconds = (currentTime - lastSampleTime) / 1000.0;

821

double throughputMBps = (intervalBytes / 1024.0 / 1024.0) / intervalSeconds;

822

823

throughputSamples.add((long) (throughputMBps * 100)); // Store as centibytes for precision

824

825

lastSampleTime = currentTime;

826

lastSampleBytes = bytesProcessed;

827

}

828

}

829

830

public StreamingReport generateReport() {

831

long totalTime = System.currentTimeMillis() - startTime;

832

double totalSeconds = totalTime / 1000.0;

833

double averageThroughput = totalSeconds > 0 ? (bytesProcessed / 1024.0 / 1024.0) / totalSeconds : 0;

834

835

double maxThroughput = throughputSamples.stream()

836

.mapToDouble(sample -> sample / 100.0)

837

.max()

838

.orElse(0.0);

839

840

double minThroughput = throughputSamples.stream()

841

.mapToDouble(sample -> sample / 100.0)

842

.min()

843

.orElse(0.0);

844

845

return new StreamingReport(

846

operationName,

847

bytesProcessed,

848

totalTime,

849

averageThroughput,

850

maxThroughput,

851

minThroughput,

852

new ArrayList<>(throughputSamples)

853

);

854

}

855

856

public static class StreamingReport {

857

public final String operation;

858

public final long totalBytes;

859

public final long totalTimeMs;

860

public final double averageThroughputMBps;

861

public final double maxThroughputMBps;

862

public final double minThroughputMBps;

863

public final List<Long> throughputSamples;

864

865

StreamingReport(String operation, long totalBytes, long totalTimeMs,

866

double averageThroughputMBps, double maxThroughputMBps, double minThroughputMBps,

867

List<Long> throughputSamples) {

868

this.operation = operation;

869

this.totalBytes = totalBytes;

870

this.totalTimeMs = totalTimeMs;

871

this.averageThroughputMBps = averageThroughputMBps;

872

this.maxThroughputMBps = maxThroughputMBps;

873

this.minThroughputMBps = minThroughputMBps;

874

this.throughputSamples = throughputSamples;

875

}

876

877

public void printReport() {

878

System.out.println("\n=== Streaming Performance Report ===");

879

System.out.println("Operation: " + operation);

880

System.out.printf("Total bytes: %s%n", formatBytes(totalBytes));

881

System.out.printf("Total time: %.2f seconds%n", totalTimeMs / 1000.0);

882

System.out.printf("Average throughput: %.2f MB/s%n", averageThroughputMBps);

883

System.out.printf("Max throughput: %.2f MB/s%n", maxThroughputMBps);

884

System.out.printf("Min throughput: %.2f MB/s%n", minThroughputMBps);

885

System.out.printf("Throughput stability: %.2f%%%n", calculateStability());

886

System.out.println("=====================================");

887

}

888

889

private double calculateStability() {

890

if (maxThroughputMBps == 0) return 100.0;

891

return (1.0 - (maxThroughputMBps - minThroughputMBps) / maxThroughputMBps) * 100.0;

892

}

893

894

private String formatBytes(long bytes) {

895

if (bytes < 1024) return bytes + " B";

896

if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);

897

if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / 1024.0 / 1024.0);

898

return String.format("%.2f GB", bytes / 1024.0 / 1024.0 / 1024.0);

899

}

900

}

901

}

902

903

// Performance-monitored streaming

904

public class MonitoredStreamingOperations {

905

906

public StreamingMetrics.StreamingReport monitoredStreamDownload(BlobClient blobClient,

907

OutputStream outputStream) {

908

StreamingMetrics metrics = new StreamingMetrics("Blob Download");

909

910

try (BlobInputStream inputStream = blobClient.openInputStream()) {

911

byte[] buffer = new byte[64 * 1024]; // 64KB buffer

912

int bytesRead;

913

914

while ((bytesRead = inputStream.read(buffer)) != -1) {

915

outputStream.write(buffer, 0, bytesRead);

916

metrics.recordBytes(bytesRead);

917

}

918

919

} catch (IOException ex) {

920

throw new RuntimeException("Monitored download failed", ex);

921

}

922

923

return metrics.generateReport();

924

}

925

926

public StreamingMetrics.StreamingReport monitoredStreamUpload(BlockBlobClient blobClient,

927

InputStream inputStream) {

928

StreamingMetrics metrics = new StreamingMetrics("Blob Upload");

929

930

try (BlobOutputStream outputStream = blobClient.getBlobOutputStream()) {

931

byte[] buffer = new byte[64 * 1024]; // 64KB buffer

932

int bytesRead;

933

934

while ((bytesRead = inputStream.read(buffer)) != -1) {

935

outputStream.write(buffer, 0, bytesRead);

936

metrics.recordBytes(bytesRead);

937

}

938

939

} catch (IOException ex) {

940

throw new RuntimeException("Monitored upload failed", ex);

941

}

942

943

return metrics.generateReport();

944

}

945

}

946

947

// Usage with performance monitoring

948

MonitoredStreamingOperations monitoredOps = new MonitoredStreamingOperations();

949

950

// Monitor download performance

951

try (FileOutputStream fileOutput = new FileOutputStream("downloaded-large-file.dat")) {

952

StreamingMetrics.StreamingReport downloadReport = monitoredOps.monitoredStreamDownload(

953

blobClient, fileOutput);

954

downloadReport.printReport();

955

}

956

957

// Monitor upload performance

958

try (FileInputStream fileInput = new FileInputStream("upload-large-file.dat")) {

959

StreamingMetrics.StreamingReport uploadReport = monitoredOps.monitoredStreamUpload(

960

blockBlobClient, fileInput);

961

uploadReport.printReport();

962

}

963

```

964

965

## Reactive Streaming Patterns

966

967

### Advanced Reactive Operations

968

969

```java

970

import reactor.core.publisher.Flux;

971

import reactor.core.publisher.Mono;

972

import reactor.core.scheduler.Schedulers;

973

974

public class ReactiveStreamingPatterns {

975

976

// Parallel processing of multiple blobs

977

public Mono<List<ProcessingResult>> processMultipleBlobsConcurrently(

978

List<BlobAsyncClient> blobClients,

979

int concurrency) {

980

981

return Flux.fromIterable(blobClients)

982

.flatMap(blobClient ->

983

processBlobReactively(blobClient)

984

.subscribeOn(Schedulers.boundedElastic()),

985

concurrency)

986

.collectList();

987

}

988

989

private Mono<ProcessingResult> processBlobReactively(BlobAsyncClient blobClient) {

990

return blobClient.downloadStream()

991

.reduce(0L, (total, buffer) -> total + buffer.remaining())

992

.map(totalBytes -> new ProcessingResult(blobClient.getBlobName(), totalBytes))

993

.doOnSuccess(result ->

994

System.out.println("Processed " + result.blobName + ": " + result.totalBytes + " bytes"))

995

.onErrorResume(error -> {

996

System.err.println("Failed to process " + blobClient.getBlobName() + ": " + error.getMessage());

997

return Mono.just(new ProcessingResult(blobClient.getBlobName(), -1));

998

});

999

}

1000

1001

// Stream transformation pipeline

1002

public Mono<Void> transformAndUpload(BlobAsyncClient sourceClient,

1003

BlobAsyncClient targetClient,

1004

Function<ByteBuffer, ByteBuffer> transformer) {

1005

1006

Flux<ByteBuffer> transformedStream = sourceClient.downloadStream()

1007

.map(transformer)

1008

.onErrorResume(error -> {

1009

System.err.println("Transformation error: " + error.getMessage());

1010

return Flux.empty();

1011

});

1012

1013

BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(transformedStream)

1014

.setParallelTransferOptions(new ParallelTransferOptions()

1015

.setBlockSizeLong(1024 * 1024L)

1016

.setMaxConcurrency(4));

1017

1018

return targetClient.uploadWithResponse(uploadOptions)

1019

.doOnSuccess(response ->

1020

System.out.println("Transform and upload completed: " + response.getStatusCode()))

1021

.then();

1022

}

1023

1024

// Batch processing with backpressure

1025

public Flux<BatchResult> batchProcessWithBackpressure(Flux<BlobAsyncClient> blobClients,

1026

int batchSize,

1027

Duration batchTimeout) {

1028

1029

return blobClients

1030

.buffer(batchSize, batchTimeout)

1031

.flatMap(batch -> processBatch(batch), 2) // Max 2 concurrent batches

1032

.onBackpressureBuffer(100); // Buffer up to 100 results

1033

}

1034

1035

private Mono<BatchResult> processBatch(List<BlobAsyncClient> batch) {

1036

return Flux.fromIterable(batch)

1037

.flatMap(blobClient ->

1038

blobClient.getProperties()

1039

.map(props -> props.getBlobSize())

1040

.onErrorReturn(0L))

1041

.reduce(Long::sum)

1042

.map(totalSize -> new BatchResult(batch.size(), totalSize))

1043

.doOnSuccess(result ->

1044

System.out.println("Batch processed: " + result.blobCount +

1045

" blobs, " + result.totalSize + " bytes"));

1046

}

1047

1048

// Streaming aggregation

1049

public Mono<AggregationResult> streamingAggregation(List<BlobAsyncClient> blobClients) {

1050

return Flux.fromIterable(blobClients)

1051

.flatMap(blobClient ->

1052

blobClient.downloadStream()

1053

.map(ByteBuffer::remaining)

1054

.cast(Long.class)

1055

.reduce(Long::sum)

1056

.map(size -> new BlobStats(blobClient.getBlobName(), size)))

1057

.reduce(new AggregationResult(), (agg, stats) -> {

1058

agg.totalBlobs++;

1059

agg.totalBytes += stats.size;

1060

agg.maxBlobSize = Math.max(agg.maxBlobSize, stats.size);

1061

agg.minBlobSize = agg.minBlobSize == 0 ? stats.size : Math.min(agg.minBlobSize, stats.size);

1062

return agg;

1063

})

1064

.doOnSuccess(result -> {

1065

result.avgBlobSize = result.totalBlobs > 0 ? result.totalBytes / result.totalBlobs : 0;

1066

System.out.println("Aggregation complete: " + result);

1067

});

1068

}

1069

1070

// Data classes for results

1071

public static class ProcessingResult {

1072

public final String blobName;

1073

public final long totalBytes;

1074

1075

ProcessingResult(String blobName, long totalBytes) {

1076

this.blobName = blobName;

1077

this.totalBytes = totalBytes;

1078

}

1079

}

1080

1081

public static class BatchResult {

1082

public final int blobCount;

1083

public final long totalSize;

1084

1085

BatchResult(int blobCount, long totalSize) {

1086

this.blobCount = blobCount;

1087

this.totalSize = totalSize;

1088

}

1089

}

1090

1091

public static class BlobStats {

1092

public final String name;

1093

public final long size;

1094

1095

BlobStats(String name, long size) {

1096

this.name = name;

1097

this.size = size;

1098

}

1099

}

1100

1101

public static class AggregationResult {

1102

public int totalBlobs = 0;

1103

public long totalBytes = 0;

1104

public long maxBlobSize = 0;

1105

public long minBlobSize = 0;

1106

public long avgBlobSize = 0;

1107

1108

@Override

1109

public String toString() {

1110

return String.format("AggregationResult{blobs=%d, totalBytes=%d, avgSize=%d, minSize=%d, maxSize=%d}",

1111

totalBlobs, totalBytes, avgBlobSize, minBlobSize, maxBlobSize);

1112

}

1113

}

1114

}

1115

1116

// Usage examples

1117

ReactiveStreamingPatterns patterns = new ReactiveStreamingPatterns();

1118

1119

// Process multiple blobs concurrently

1120

List<BlobAsyncClient> asyncClients = Arrays.asList(

1121

blobClient1.getAsyncClient(),

1122

blobClient2.getAsyncClient(),

1123

blobClient3.getAsyncClient()

1124

);

1125

1126

patterns.processMultipleBlobsConcurrently(asyncClients, 3)

1127

.doOnSuccess(results ->

1128

System.out.println("Processed " + results.size() + " blobs concurrently"))

1129

.subscribe();

1130

1131

// Transform and upload pipeline

1132

patterns.transformAndUpload(

1133

sourceBlobClient.getAsyncClient(),

1134

targetBlobClient.getAsyncClient(),

1135

buffer -> {

1136

// Example transformation: convert to uppercase text

1137

String text = StandardCharsets.UTF_8.decode(buffer).toString().toUpperCase();

1138

return ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8));

1139

}

1140

).subscribe();

1141

```

1142

1143

## Related Documentation

1144

1145

- [← Back to Overview](index.md)

1146

- [← Security & Authentication](security.md)

1147

- [Specialized Blob Types →](specialized-clients.md)

1148

- [Configuration Options →](options.md)