or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mderrors.mdindex.mdpathio.mdserver.mdstreaming.md

streaming.mddocs/

0

# Streaming and Throttling

1

2

Stream wrappers with timeout and throttling support for controlling data transfer rates and managing network connections. Includes basic streams with timeout support, throttled streams with configurable read/write limits, and async iteration capabilities for processing streaming data.

3

4

## Capabilities

5

6

### Basic Stream Operations

7

8

Fundamental stream wrapper providing timeout support for asyncio streams.

9

10

```python { .api }

11

class StreamIO:

12

"""Basic async stream wrapper with timeout support."""

13

14

def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,

15

timeout: float = None, read_timeout: float = None,

16

write_timeout: float = None):

17

"""

18

Initialize stream wrapper.

19

20

Parameters:

21

- reader: Asyncio stream reader for input operations

22

- writer: Asyncio stream writer for output operations

23

- timeout: Default timeout for all operations (seconds)

24

- read_timeout: Specific timeout for read operations (seconds)

25

- write_timeout: Specific timeout for write operations (seconds)

26

"""

27

28

async def read(self, count: int = -1) -> bytes:

29

"""

30

Read data from stream.

31

32

Parameters:

33

- count: Number of bytes to read (-1 for all available)

34

35

Returns:

36

Bytes read from stream

37

"""

38

39

async def readline(self) -> bytes:

40

"""

41

Read a line from stream (up to newline).

42

43

Returns:

44

Line data as bytes including newline character

45

"""

46

47

async def readexactly(self, count: int) -> bytes:

48

"""

49

Read exactly the specified number of bytes.

50

51

Parameters:

52

- count: Exact number of bytes to read

53

54

Returns:

55

Exactly count bytes from stream

56

57

Raises:

58

IncompleteReadError if fewer bytes available

59

"""

60

61

async def write(self, data: bytes) -> None:

62

"""

63

Write data to stream.

64

65

Parameters:

66

- data: Bytes to write to stream

67

"""

68

69

def close(self) -> None:

70

"""Close the stream writer immediately."""

71

72

async def start_tls(self, sslcontext: ssl.SSLContext, server_hostname: str = None) -> None:

73

"""

74

Upgrade connection to TLS.

75

76

Parameters:

77

- sslcontext: SSL context for encryption

78

- server_hostname: Server hostname for certificate validation

79

"""

80

```

81

82

### Throttling Mechanisms

83

84

Speed limiting components for controlling data transfer rates.

85

86

```python { .api }

87

class Throttle:

88

"""Speed throttling mechanism for rate limiting."""

89

90

def __init__(self, limit: int = None, reset_rate: int = 10):

91

"""

92

Initialize throttle.

93

94

Parameters:

95

- limit: Speed limit in bytes per second (None for unlimited)

96

- reset_rate: Rate statistics reset frequency

97

"""

98

99

async def wait(self) -> None:

100

"""Wait if throttling is needed based on current transfer rate."""

101

102

def append(self, data: bytes, start: float) -> None:

103

"""

104

Record data transfer for rate calculation.

105

106

Parameters:

107

- data: Data that was transferred

108

- start: Transfer start time (from time.time())

109

"""

110

111

def clone(self) -> Throttle:

112

"""

113

Create a copy of this throttle with same settings.

114

115

Returns:

116

New Throttle instance with identical configuration

117

"""

118

119

@property

120

def limit(self) -> Union[int, None]:

121

"""

122

Current speed limit in bytes per second.

123

124

Returns:

125

Speed limit or None if unlimited

126

"""

127

128

class StreamThrottle(NamedTuple):

129

"""Named tuple combining read and write throttles."""

130

131

read: Throttle

132

"""Throttle for read operations."""

133

134

write: Throttle

135

"""Throttle for write operations."""

136

137

def clone(self) -> StreamThrottle:

138

"""

139

Create a copy of this stream throttle.

140

141

Returns:

142

New StreamThrottle with cloned read/write throttles

143

"""

144

145

@classmethod

146

def from_limits(read_speed_limit: int = None, write_speed_limit: int = None) -> StreamThrottle:

147

"""

148

Create StreamThrottle from speed limits.

149

150

Parameters:

151

- read_speed_limit: Read speed limit in bytes/second

152

- write_speed_limit: Write speed limit in bytes/second

153

154

Returns:

155

StreamThrottle configured with specified limits

156

"""

157

```

158

159

### Throttled Stream Operations

160

161

Stream wrapper combining basic I/O with throttling capabilities.

162

163

```python { .api }

164

class ThrottleStreamIO(StreamIO):

165

"""Stream with throttling support for rate-limited operations."""

166

167

def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,

168

throttles: dict[str, StreamThrottle] = {}, timeout: float = None,

169

read_timeout: float = None, write_timeout: float = None):

170

"""

171

Initialize throttled stream.

172

173

Parameters:

174

- reader: Asyncio stream reader

175

- writer: Asyncio stream writer

176

- throttles: Dictionary mapping throttle names to StreamThrottle objects

177

- timeout: Default timeout for operations

178

- read_timeout: Specific read timeout

179

- write_timeout: Specific write timeout

180

"""

181

182

async def wait(self, name: str) -> None:

183

"""

184

Wait for throttle if needed.

185

186

Parameters:

187

- name: Name of throttle to check

188

"""

189

190

def append(self, name: str, data: bytes, start: float) -> None:

191

"""

192

Record data transfer for throttle calculation.

193

194

Parameters:

195

- name: Name of throttle to update

196

- data: Data that was transferred

197

- start: Transfer start time

198

"""

199

200

async def read(self, count: int = -1) -> bytes:

201

"""Read data with throttling applied."""

202

203

async def readline(self) -> bytes:

204

"""Read line with throttling applied."""

205

206

async def write(self, data: bytes) -> None:

207

"""Write data with throttling applied."""

208

209

async def __aenter__(self) -> ThrottleStreamIO:

210

"""Async context manager entry."""

211

212

async def __aexit__(*args) -> None:

213

"""Async context manager exit."""

214

215

def iter_by_line(self) -> AsyncStreamIterator:

216

"""

217

Create async iterator for line-by-line processing.

218

219

Returns:

220

Async iterator yielding lines as bytes

221

"""

222

223

def iter_by_block(self, count: int = 8192) -> AsyncStreamIterator:

224

"""

225

Create async iterator for block-by-block processing.

226

227

Parameters:

228

- count: Block size in bytes

229

230

Returns:

231

Async iterator yielding data blocks as bytes

232

"""

233

```

234

235

### Stream Iteration

236

237

Async iterators for processing streaming data.

238

239

```python { .api }

240

class AsyncStreamIterator:

241

"""Async iterator for stream data processing."""

242

243

def __aiter__(self) -> AsyncStreamIterator:

244

"""Return async iterator."""

245

246

async def __anext__(self) -> bytes:

247

"""

248

Get next data chunk.

249

250

Returns:

251

Next chunk of stream data

252

253

Raises:

254

StopAsyncIteration when stream ends

255

"""

256

```

257

258

### FTP-Specific Stream Wrapper

259

260

Data connection stream with FTP protocol integration.

261

262

```python { .api }

263

class DataConnectionThrottleStreamIO(ThrottleStreamIO):

264

"""Throttled stream for FTP data connections with protocol integration."""

265

266

def __init__(self, client, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,

267

throttles: dict[str, StreamThrottle], timeout: float,

268

read_timeout: float, write_timeout: float):

269

"""

270

Initialize FTP data connection stream.

271

272

Parameters:

273

- client: FTP client instance for protocol communication

274

- reader: Stream reader for data

275

- writer: Stream writer for data

276

- throttles: Throttling configuration

277

- timeout: Default operation timeout

278

- read_timeout: Read operation timeout

279

- write_timeout: Write operation timeout

280

"""

281

282

async def finish(self, expected_codes: str = "2xx", wait_codes: str = "1xx") -> None:

283

"""

284

Finish data transfer and wait for server confirmation.

285

286

Parameters:

287

- expected_codes: FTP status codes expected on completion

288

- wait_codes: FTP status codes to wait for during transfer

289

"""

290

291

async def __aexit__(exc_type, exc, tb) -> None:

292

"""Async context manager exit with FTP protocol cleanup."""

293

```

294

295

## Usage Examples

296

297

### Basic Stream Operations

298

299

```python

300

import aioftp

301

import asyncio

302

303

async def basic_streaming():

304

"""Example of basic stream operations with timeout."""

305

306

reader, writer = await asyncio.open_connection("example.com", 80)

307

308

# Create stream with timeout

309

stream = aioftp.StreamIO(reader, writer, timeout=30.0)

310

311

try:

312

# Write HTTP request

313

await stream.write(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")

314

315

# Read response line by line

316

status_line = await stream.readline()

317

print(f"Status: {status_line.decode().strip()}")

318

319

# Read specific amount of data

320

data = await stream.read(1024)

321

print(f"Received {len(data)} bytes")

322

323

finally:

324

stream.close()

325

326

asyncio.run(basic_streaming())

327

```

328

329

### Throttled File Transfer

330

331

```python

332

import aioftp

333

import asyncio

334

335

async def throttled_transfer():

336

"""Example of throttled stream operations."""

337

338

# Create throttles for rate limiting

339

read_throttle = aioftp.Throttle(limit=1024*1024) # 1MB/s read

340

write_throttle = aioftp.Throttle(limit=512*1024) # 512KB/s write

341

342

stream_throttle = aioftp.StreamThrottle(

343

read=read_throttle,

344

write=write_throttle

345

)

346

347

reader, writer = await asyncio.open_connection("ftp.example.com", 21)

348

349

# Create throttled stream

350

throttled_stream = aioftp.ThrottleStreamIO(

351

reader, writer,

352

throttles={"default": stream_throttle},

353

timeout=60.0

354

)

355

356

try:

357

async with throttled_stream:

358

# Transfers will be automatically rate-limited

359

await throttled_stream.write(b"USER anonymous\r\n")

360

response = await throttled_stream.readline()

361

print(f"Response: {response.decode().strip()}")

362

363

except asyncio.TimeoutError:

364

print("Operation timed out")

365

366

asyncio.run(throttled_transfer())

367

```

368

369

### Stream Iteration

370

371

```python

372

import aioftp

373

import asyncio

374

375

async def stream_iteration_example():

376

"""Example of iterating over stream data."""

377

378

async with aioftp.Client.context("ftp.example.com") as client:

379

# Download large file using stream iteration

380

async with client.download_stream("large_file.txt") as stream:

381

# Process file line by line

382

async for line in stream.iter_by_line():

383

# Process each line without loading entire file into memory

384

processed_line = line.decode().strip().upper()

385

print(f"Processed: {processed_line}")

386

387

# Upload file using block iteration

388

async with client.upload_stream("output_file.txt") as stream:

389

# Write data in blocks

390

async for block in stream.iter_by_block(count=4096):

391

# Process and write blocks

392

processed_block = block.upper() # Example processing

393

await stream.write(processed_block)

394

395

asyncio.run(stream_iteration_example())

396

```

397

398

### Advanced Throttling Configuration

399

400

```python

401

import aioftp

402

import asyncio

403

404

async def advanced_throttling():

405

"""Example with multiple throttling configurations."""

406

407

# Different throttles for different operations

408

fast_throttle = aioftp.StreamThrottle.from_limits(

409

read_speed_limit=10*1024*1024, # 10MB/s

410

write_speed_limit=10*1024*1024

411

)

412

413

slow_throttle = aioftp.StreamThrottle.from_limits(

414

read_speed_limit=256*1024, # 256KB/s

415

write_speed_limit=256*1024

416

)

417

418

async with aioftp.Client.context("ftp.example.com") as client:

419

# Fast upload for small files

420

await client.upload("small_file.txt", "small_remote.txt")

421

422

# Throttled upload for large files to limit bandwidth usage

423

async with client.upload_stream("large_file.txt") as stream:

424

# Apply custom throttling

425

stream.throttles["upload"] = slow_throttle

426

427

with open("large_file.txt", "rb") as f:

428

while True:

429

chunk = f.read(8192)

430

if not chunk:

431

break

432

await stream.write(chunk)

433

434

asyncio.run(advanced_throttling())

435

```

436

437

### Custom Stream Processing

438

439

```python

440

import aioftp

441

import asyncio

442

import time

443

444

async def custom_stream_processing():

445

"""Example with custom stream processing and monitoring."""

446

447

class MonitoredThrottle(aioftp.Throttle):

448

"""Custom throttle with transfer monitoring."""

449

450

def __init__(self, *args, **kwargs):

451

super().__init__(*args, **kwargs)

452

self.total_bytes = 0

453

self.start_time = time.time()

454

455

def append(self, data: bytes, start: float):

456

super().append(data, start)

457

self.total_bytes += len(data)

458

459

# Log progress every MB

460

if self.total_bytes % (1024*1024) == 0:

461

elapsed = time.time() - self.start_time

462

rate = self.total_bytes / elapsed if elapsed > 0 else 0

463

print(f"Transferred {self.total_bytes} bytes at {rate:.0f} bytes/sec")

464

465

# Create monitored throttles

466

monitored = aioftp.StreamThrottle(

467

read=MonitoredThrottle(limit=2*1024*1024),

468

write=MonitoredThrottle(limit=2*1024*1024)

469

)

470

471

async with aioftp.Client.context("ftp.example.com") as client:

472

async with client.download_stream("large_file.zip") as stream:

473

stream.throttles["monitored"] = monitored

474

475

with open("downloaded_file.zip", "wb") as f:

476

async for chunk in stream.iter_by_block(count=64*1024):

477

f.write(chunk)

478

479

asyncio.run(custom_stream_processing())

480

```

481

482

## Performance Considerations

483

484

1. **Block Size**: Larger blocks reduce overhead but increase memory usage

485

2. **Throttle Limits**: Set appropriate limits based on network capacity and requirements

486

3. **Timeout Values**: Balance responsiveness with network conditions

487

4. **Stream Iteration**: Use appropriate iteration method (line vs block) for data type

488

5. **Memory Usage**: Stream processing keeps memory usage constant regardless of file size

489

490

## Best Practices

491

492

1. **Always use context managers** (`async with`) for proper resource cleanup

493

2. **Set appropriate timeouts** to prevent hanging operations

494

3. **Use throttling** to be respectful of network resources

495

4. **Monitor transfer progress** for long-running operations

496

5. **Handle exceptions** properly, especially timeout and connection errors

497

6. **Choose appropriate block sizes** based on your use case and memory constraints