or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

blob-management.mdbucket-operations.mdindex.mdstorage-client.mdstreaming-operations.md

streaming-operations.mddocs/

0

# Streaming Operations

1

2

The StreamResponse class provides efficient streaming functionality for handling large files without loading entire contents into memory. It serves as a wrapper around HTTP responses that enables chunk-by-chunk processing of download streams, making it ideal for processing large objects or when memory usage is a concern.

3

4

## Capabilities

5

6

### Stream Initialization

7

8

StreamResponse instances are created by the Storage client's `download_stream()` method and should not be instantiated directly.

9

10

```python { .api }

11

def __init__(self, response):

12

"""

13

Initialize a StreamResponse wrapper.

14

15

Parameters:

16

- response (aiohttp.ClientResponse): HTTP response object

17

18

Attributes:

19

- response: Underlying HTTP response object

20

"""

21

```

22

23

**Usage Example:**

24

```python

25

async with Storage() as storage:

26

# Get streaming response (don't instantiate directly)

27

stream = storage.download_stream('my-bucket', 'large-file.dat')

28

29

# StreamResponse is created internally by download_stream()

30

async with stream as stream_reader:

31

# Use stream_reader for processing

32

pass

33

```

34

35

### Content Length Information

36

37

Access to the total content length of the streamed object.

38

39

```python { .api }

40

@property

41

def content_length(self):

42

"""

43

Get the total content length of the stream.

44

45

Returns:

46

int: Content length in bytes, or None if not available

47

"""

48

```

49

50

**Usage Example:**

51

```python

52

async with Storage() as storage:

53

async with storage.download_stream('my-bucket', 'video.mp4') as stream:

54

total_size = stream.content_length

55

if total_size:

56

print(f"Downloading {total_size:,} bytes")

57

58

bytes_read = 0

59

while True:

60

chunk = await stream.read(8192)

61

if not chunk:

62

break

63

bytes_read += len(chunk)

64

65

# Show progress

66

if total_size:

67

progress = (bytes_read / total_size) * 100

68

print(f"Progress: {progress:.1f}% ({bytes_read:,}/{total_size:,} bytes)")

69

```

70

71

### Stream Reading

72

73

Read data from the stream in configurable chunk sizes.

74

75

```python { .api }

76

async def read(self, size=-1):

77

"""

78

Read data from the stream.

79

80

Parameters:

81

- size (int): Number of bytes to read. -1 reads all remaining data

82

83

Returns:

84

bytes: Data chunk read from stream, empty bytes when stream is exhausted

85

"""

86

```

87

88

**Usage Example:**

89

```python

90

async with Storage() as storage:

91

async with storage.download_stream('my-bucket', 'large-dataset.csv') as stream:

92

# Read in 64KB chunks

93

chunk_size = 65536

94

95

while True:

96

chunk = await stream.read(chunk_size)

97

if not chunk: # End of stream

98

break

99

100

# Process chunk (e.g., parse CSV rows)

101

process_data_chunk(chunk)

102

103

# Read all remaining data at once (not recommended for large files)

104

# remaining_data = await stream.read(-1)

105

```

106

107

### Context Manager Support

108

109

StreamResponse supports async context manager protocol for automatic resource cleanup.

110

111

```python { .api }

112

async def __aenter__(self):

113

"""

114

Enter async context manager.

115

116

Returns:

117

StreamResponse: Self for use in context

118

"""

119

120

async def __aexit__(self, *exc_info):

121

"""

122

Exit async context manager, cleaning up resources.

123

124

Parameters:

125

- exc_info: Exception information if context exited due to exception

126

127

Returns:

128

None

129

"""

130

```

131

132

**Usage Example:**

133

```python

134

async with Storage() as storage:

135

# Recommended: automatic cleanup with context manager

136

async with storage.download_stream('my-bucket', 'file.dat') as stream:

137

while True:

138

chunk = await stream.read(8192)

139

if not chunk:

140

break

141

# Process chunk

142

# Stream automatically closed here

143

144

# Manual management (not recommended)

145

stream = storage.download_stream('my-bucket', 'file.dat')

146

try:

147

async with stream as stream_reader:

148

# Use stream_reader

149

pass

150

finally:

151

# Cleanup handled by context manager

152

pass

153

```

154

155

## Common Usage Patterns

156

157

### Large File Download with Progress Tracking

158

159

```python

160

import asyncio

161

from pathlib import Path

162

163

async def download_with_progress(storage, bucket_name, object_name, local_path):

164

"""Download large file with progress tracking."""

165

166

async with storage.download_stream(bucket_name, object_name) as stream:

167

total_size = stream.content_length

168

bytes_downloaded = 0

169

170

with open(local_path, 'wb') as f:

171

while True:

172

chunk = await stream.read(1024 * 1024) # 1MB chunks

173

if not chunk:

174

break

175

176

f.write(chunk)

177

bytes_downloaded += len(chunk)

178

179

if total_size:

180

progress = (bytes_downloaded / total_size) * 100

181

print(f"\rDownloading: {progress:.1f}% complete", end='', flush=True)

182

183

print(f"\nDownload complete: {bytes_downloaded:,} bytes")

184

185

# Usage

186

async with Storage() as storage:

187

await download_with_progress(

188

storage,

189

'my-bucket',

190

'large-video.mp4',

191

'/tmp/downloaded-video.mp4'

192

)

193

```

194

195

### Streaming Data Processing

196

197

```python

198

import json

199

import asyncio

200

201

async def process_streaming_json_lines(storage, bucket_name, object_name):

202

"""Process JSON Lines file without loading it entirely into memory."""

203

204

async with storage.download_stream(bucket_name, object_name) as stream:

205

buffer = b''

206

207

while True:

208

chunk = await stream.read(8192)

209

if not chunk:

210

# Process any remaining data in buffer

211

if buffer.strip():

212

try:

213

record = json.loads(buffer.decode('utf-8'))

214

await process_record(record)

215

except json.JSONDecodeError:

216

print(f"Warning: Could not parse final record: {buffer}")

217

break

218

219

buffer += chunk

220

221

# Process complete lines

222

while b'\n' in buffer:

223

line, buffer = buffer.split(b'\n', 1)

224

if line.strip(): # Skip empty lines

225

try:

226

record = json.loads(line.decode('utf-8'))

227

await process_record(record)

228

except json.JSONDecodeError as e:

229

print(f"Warning: Could not parse line: {line} ({e})")

230

231

async def process_record(record):

232

"""Process individual JSON record."""

233

# Your processing logic here

234

print(f"Processing: {record.get('id', 'unknown')}")

235

await asyncio.sleep(0.01) # Simulate processing time

236

237

# Usage

238

async with Storage() as storage:

239

await process_streaming_json_lines(storage, 'data-bucket', 'large-dataset.jsonl')

240

```

241

242

### Concurrent Stream Processing

243

244

```python

245

import asyncio

246

from typing import List

247

248

async def parallel_stream_download(storage, bucket_name, object_names: List[str]):

249

"""Download multiple large files concurrently using streams."""

250

251

async def download_single(object_name):

252

local_path = f"/tmp/{object_name.replace('/', '_')}"

253

254

async with storage.download_stream(bucket_name, object_name) as stream:

255

with open(local_path, 'wb') as f:

256

bytes_written = 0

257

258

while True:

259

chunk = await stream.read(64 * 1024) # 64KB chunks

260

if not chunk:

261

break

262

f.write(chunk)

263

bytes_written += len(chunk)

264

265

print(f"Downloaded {object_name}: {bytes_written:,} bytes")

266

return local_path

267

268

# Download all files concurrently

269

tasks = [download_single(name) for name in object_names]

270

downloaded_paths = await asyncio.gather(*tasks)

271

272

return downloaded_paths

273

274

# Usage

275

async with Storage() as storage:

276

files_to_download = [

277

'datasets/2023/data-01.csv',

278

'datasets/2023/data-02.csv',

279

'datasets/2023/data-03.csv'

280

]

281

282

paths = await parallel_stream_download(storage, 'my-bucket', files_to_download)

283

print(f"All files downloaded to: {paths}")

284

```

285

286

### Memory-Efficient Data Transformation

287

288

```python

289

import gzip

290

import asyncio

291

292

async def transform_and_reupload(storage, source_bucket, source_object, dest_bucket, dest_object):

293

"""Transform data while streaming from source to destination."""

294

295

# Download stream from source

296

async with storage.download_stream(source_bucket, source_object) as source_stream:

297

298

# Collect transformed data in chunks

299

transformed_chunks = []

300

301

while True:

302

chunk = await source_stream.read(32 * 1024) # 32KB chunks

303

if not chunk:

304

break

305

306

# Transform data (example: convert to uppercase)

307

transformed_chunk = chunk.upper()

308

transformed_chunks.append(transformed_chunk)

309

310

# Optional: Process chunks in smaller batches to control memory

311

if len(transformed_chunks) >= 10: # Process every 10 chunks

312

batch_data = b''.join(transformed_chunks)

313

# Could upload partial results here for very large files

314

transformed_chunks = []

315

316

# Combine all transformed data

317

final_data = b''.join(transformed_chunks)

318

319

# Compress before uploading

320

compressed_data = gzip.compress(final_data)

321

322

# Upload transformed result

323

result = await storage.upload(

324

dest_bucket,

325

dest_object,

326

compressed_data,

327

content_type='application/gzip'

328

)

329

330

print(f"Transformed {len(final_data):,} bytes to {len(compressed_data):,} bytes")

331

return result

332

333

# Usage

334

async with Storage() as storage:

335

await transform_and_reupload(

336

storage,

337

'input-bucket', 'raw-data.txt',

338

'output-bucket', 'processed-data.txt.gz'

339

)

340

```

341

342

## Performance Considerations

343

344

### Optimal Chunk Sizes

345

346

```python

347

# Recommended chunk sizes for different scenarios:

348

349

# Network-limited environments (slower connections)

350

chunk_size = 8 * 1024 # 8KB

351

352

# Balanced performance (most common)

353

chunk_size = 64 * 1024 # 64KB

354

355

# High-bandwidth, processing-intensive tasks

356

chunk_size = 1024 * 1024 # 1MB

357

358

# Memory-constrained environments

359

chunk_size = 4 * 1024 # 4KB

360

361

async with storage.download_stream('bucket', 'file') as stream:

362

while True:

363

chunk = await stream.read(chunk_size)

364

if not chunk:

365

break

366

# Process with appropriate chunk size

367

```

368

369

### Resource Management

370

371

```python

372

async with Storage() as storage:

373

# Always use context managers for automatic cleanup

374

async with storage.download_stream('bucket', 'large-file') as stream:

375

# Stream resources are automatically managed

376

while True:

377

chunk = await stream.read(8192)

378

if not chunk:

379

break

380

# Process chunk

381

# Resources automatically released here

382

```