or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mddata.mdindex.mdserver.mdtesting.mdutilities.mdwebsocket.md

data.mddocs/

0

# Data Handling

1

2

Comprehensive data processing and payload system supporting various data types, multipart content, form data processing, and streaming operations. Includes automatic content encoding/decoding and extensive customization options for different data formats.

3

4

## Capabilities

5

6

### Form Data Processing

7

8

Form data creation and processing for HTML forms and file uploads with support for multiple encoding types.

9

10

```python { .api }

11

class FormData:

12

def __init__(self, quote_fields=True, charset='utf-8'):

13

"""

14

Create form data container.

15

16

Parameters:

17

- quote_fields (bool): Quote field names and values

18

- charset (str): Text encoding

19

"""

20

21

def add_field(

22

self,

23

name,

24

value,

25

*,

26

content_type=None,

27

filename=None,

28

content_transfer_encoding=None

29

):

30

"""

31

Add form field.

32

33

Parameters:

34

- name (str): Field name

35

- value: Field value (str, bytes, or file-like)

36

- content_type (str): Content type for field

37

- filename (str): Filename for file fields

38

- content_transfer_encoding (str): Transfer encoding

39

"""

40

41

def add_fields(self, *fields):

42

"""Add multiple fields from iterable."""

43

44

def is_multipart(self):

45

"""Check if form requires multipart encoding."""

46

```

47

48

### Multipart Content Processing

49

50

Reading and writing multipart content for file uploads, mixed content types, and complex data structures.

51

52

```python { .api }

53

class MultipartReader:

54

def __init__(self, headers, content):

55

"""

56

Create multipart content reader.

57

58

Parameters:

59

- headers: Request headers

60

- content: Content stream

61

"""

62

63

async def next(self):

64

"""

65

Get next part from multipart content.

66

67

Returns:

68

BodyPartReader or None: Next body part or None if finished

69

"""

70

71

async def release(self):

72

"""Release reader resources."""

73

74

def at_eof(self):

75

"""Check if at end of content."""

76

77

class BodyPartReader:

78

@property

79

def headers(self):

80

"""Part headers."""

81

82

@property

83

def name(self):

84

"""Part name from Content-Disposition."""

85

86

@property

87

def filename(self):

88

"""Part filename from Content-Disposition."""

89

90

async def read(self, decode=False):

91

"""

92

Read part content.

93

94

Parameters:

95

- decode (bool): Decode content based on headers

96

97

Returns:

98

bytes: Part content

99

"""

100

101

async def read_chunk(self, size=8192):

102

"""Read content chunk."""

103

104

async def text(self, encoding=None):

105

"""Read part content as text."""

106

107

async def json(self, encoding=None):

108

"""Parse part content as JSON."""

109

110

async def release(self):

111

"""Release part resources."""

112

113

class MultipartWriter:

114

def __init__(self, subtype='mixed', boundary=None):

115

"""

116

Create multipart content writer.

117

118

Parameters:

119

- subtype (str): Multipart subtype

120

- boundary (str): Multipart boundary

121

"""

122

123

def append(self, obj, headers=None):

124

"""

125

Append object to multipart content.

126

127

Parameters:

128

- obj: Object to append (str, bytes, file-like, or payload)

129

- headers (dict): Part headers

130

131

Returns:

132

Payload: Created payload for the part

133

"""

134

135

def append_json(self, obj, headers=None):

136

"""Append JSON object to multipart content."""

137

138

def append_form(self, obj, headers=None):

139

"""Append form data to multipart content."""

140

141

@property

142

def boundary(self):

143

"""Multipart boundary."""

144

145

@property

146

def size(self):

147

"""Content size if known."""

148

149

def content_disposition_filename(name, fallback=None):

150

"""

151

Extract filename from Content-Disposition header value.

152

153

Parameters:

154

- name (str): Content-Disposition header value

155

- fallback (str): Fallback filename

156

157

Returns:

158

str or None: Extracted filename

159

"""

160

161

def parse_content_disposition(header):

162

"""

163

Parse Content-Disposition header.

164

165

Parameters:

166

- header (str): Content-Disposition header value

167

168

Returns:

169

tuple: (disposition_type, parameters_dict)

170

"""

171

172

class BadContentDispositionHeader(Exception):

173

"""Invalid Content-Disposition header."""

174

175

class BadContentDispositionParam(Exception):

176

"""Invalid Content-Disposition parameter."""

177

```

178

179

### Payload System

180

181

Flexible payload system supporting various data types with automatic serialization and content type detection.

182

183

```python { .api }

184

class Payload:

185

def __init__(self, value, headers=None, content_type=None, filename=None, encoding=None):

186

"""

187

Base payload class.

188

189

Parameters:

190

- value: Payload data

191

- headers (dict): Payload headers

192

- content_type (str): Content type

193

- filename (str): Filename for file payloads

194

- encoding (str): Text encoding

195

"""

196

197

@property

198

def size(self):

199

"""Payload size if known."""

200

201

@property

202

def headers(self):

203

"""Payload headers."""

204

205

@property

206

def content_type(self):

207

"""Payload content type."""

208

209

async def write(self, writer):

210

"""Write payload to stream writer."""

211

212

class BytesPayload(Payload):

213

"""Payload for bytes data."""

214

215

class StringPayload(Payload):

216

"""Payload for string data."""

217

218

class IOBasePayload(Payload):

219

"""Payload for file-like objects."""

220

221

class BufferedReaderPayload(IOBasePayload):

222

"""Payload for buffered readers."""

223

224

class TextIOPayload(IOBasePayload):

225

"""Payload for text I/O objects."""

226

227

class StringIOPayload(TextIOPayload):

228

"""Payload for StringIO objects."""

229

230

class BytesIOPayload(IOBasePayload):

231

"""Payload for BytesIO objects."""

232

233

class JsonPayload(Payload):

234

def __init__(

235

self,

236

value,

237

encoding='utf-8',

238

content_type='application/json',

239

dumps=None,

240

**kwargs

241

):

242

"""

243

JSON payload.

244

245

Parameters:

246

- value: Object to serialize as JSON

247

- encoding (str): Text encoding

248

- content_type (str): Content type

249

- dumps: JSON serialization function

250

"""

251

252

class AsyncIterablePayload(Payload):

253

def __init__(self, value, **kwargs):

254

"""

255

Payload for async iterables.

256

257

Parameters:

258

- value: Async iterable data source

259

"""

260

261

def get_payload(data, *args, **kwargs):

262

"""

263

Get appropriate payload for data.

264

265

Parameters:

266

- data: Data to create payload for

267

268

Returns:

269

Payload: Appropriate payload instance

270

"""

271

272

def payload_type(payload_class):

273

"""

274

Register payload type for specific data types.

275

276

Parameters:

277

- payload_class: Payload class to register

278

279

Returns:

280

Function: Registration decorator

281

"""

282

283

# Global payload registry

284

PAYLOAD_REGISTRY = None # PayloadRegistry instance

285

```

286

287

### Streaming Support

288

289

Streaming payload creation and data streaming utilities.

290

291

```python { .api }

292

def streamer(func):

293

"""

294

Decorator to create streaming payload from generator function.

295

296

Parameters:

297

- func: Async generator function

298

299

Returns:

300

Function: Payload factory function

301

"""

302

```

303

304

### Cookie Handling

305

306

Cookie management and storage implementations.

307

308

```python { .api }

309

class CookieJar:

310

def __init__(self, *, unsafe=False, quote_cookie=True, treat_as_secure_origin=None):

311

"""

312

Default cookie jar implementation.

313

314

Parameters:

315

- unsafe (bool): Allow unsafe cookies

316

- quote_cookie (bool): Quote cookie values

317

- treat_as_secure_origin: Origins to treat as secure

318

"""

319

320

def update_cookies(self, cookies, response_url=None):

321

"""Update cookies from response."""

322

323

def filter_cookies(self, request_url=None):

324

"""Filter cookies for request."""

325

326

def clear(self, predicate=None):

327

"""Clear cookies matching predicate."""

328

329

def clear_domain(self, domain):

330

"""Clear cookies for domain."""

331

332

def __iter__(self):

333

"""Iterate over cookies."""

334

335

def __len__(self):

336

"""Number of stored cookies."""

337

338

class DummyCookieJar:

339

"""No-op cookie jar that stores no cookies."""

340

341

def update_cookies(self, cookies, response_url=None):

342

"""No-op cookie update."""

343

344

def filter_cookies(self, request_url=None):

345

"""Return empty cookie collection."""

346

```

347

348

### Stream Processing

349

350

Low-level stream readers and data queues for advanced data processing.

351

352

```python { .api }

353

class StreamReader:

354

def __init__(self, protocol, limit=2**16, loop=None):

355

"""

356

Asynchronous stream reader.

357

358

Parameters:

359

- protocol: Stream protocol

360

- limit (int): Buffer size limit

361

- loop: Event loop

362

"""

363

364

async def read(self, n=-1):

365

"""

366

Read up to n bytes.

367

368

Parameters:

369

- n (int): Number of bytes to read (-1 for all)

370

371

Returns:

372

bytes: Read data

373

"""

374

375

async def readline(self):

376

"""Read one line."""

377

378

async def readexactly(self, n):

379

"""Read exactly n bytes."""

380

381

async def readuntil(self, separator=b'\n'):

382

"""Read until separator."""

383

384

def at_eof(self):

385

"""Check if at end of stream."""

386

387

def feed_eof(self):

388

"""Signal end of stream."""

389

390

def feed_data(self, data):

391

"""Feed data to stream."""

392

393

class DataQueue:

394

def __init__(self, *, loop=None):

395

"""Generic data queue."""

396

397

def __aiter__(self):

398

"""Async iterator over queue items."""

399

400

async def read(self):

401

"""Read next item from queue."""

402

403

def feed_data(self, data, size=0):

404

"""Add data to queue."""

405

406

def feed_eof(self):

407

"""Signal end of queue."""

408

409

def is_eof(self):

410

"""Check if queue is at EOF."""

411

412

def at_eof(self):

413

"""Check if queue is finished."""

414

415

class FlowControlDataQueue(DataQueue):

416

def __init__(self, protocol, limit=2**16, *, loop=None):

417

"""

418

Flow-controlled data queue.

419

420

Parameters:

421

- protocol: Flow control protocol

422

- limit (int): Buffer size limit

423

- loop: Event loop

424

"""

425

426

class EofStream(Exception):

427

"""End of stream exception."""

428

429

# Empty payload singleton

430

EMPTY_PAYLOAD = None

431

```

432

433

## Usage Examples

434

435

### Form Data Submission

436

437

```python

438

import aiohttp

439

440

async def submit_form():

441

# Create form data

442

form = aiohttp.FormData()

443

form.add_field('name', 'John Doe')

444

form.add_field('email', 'john@example.com')

445

form.add_field('age', '30')

446

447

# Add file upload

448

with open('document.pdf', 'rb') as f:

449

form.add_field('file', f,

450

filename='document.pdf',

451

content_type='application/pdf')

452

453

# Submit form

454

async with aiohttp.ClientSession() as session:

455

async with session.post('https://api.example.com/submit',

456

data=form) as response:

457

result = await response.json()

458

return result

459

```

460

461

### Multipart Content Processing

462

463

```python

464

from aiohttp import web

465

466

async def upload_handler(request):

467

reader = await request.multipart()

468

469

uploaded_files = []

470

form_fields = {}

471

472

async for part in reader:

473

if part.filename:

474

# Handle file upload

475

filename = part.filename

476

content = await part.read()

477

478

# Save file

479

with open(f'uploads/{filename}', 'wb') as f:

480

f.write(content)

481

482

uploaded_files.append({

483

'filename': filename,

484

'size': len(content),

485

'content_type': part.headers.get('Content-Type')

486

})

487

488

else:

489

# Handle form field

490

field_name = part.headers['Content-Disposition'].split('name="')[1].split('"')[0]

491

field_value = await part.text()

492

form_fields[field_name] = field_value

493

494

return web.json_response({

495

'files': uploaded_files,

496

'fields': form_fields

497

})

498

499

app = web.Application()

500

app.router.add_post('/upload', upload_handler)

501

```

502

503

### JSON Payload Streaming

504

505

```python

506

import aiohttp

507

import asyncio

508

import json

509

510

@aiohttp.streamer

511

async def json_stream_generator():

512

"""Generate streaming JSON data."""

513

yield b'['

514

515

for i in range(1000):

516

if i > 0:

517

yield b','

518

519

data = {'id': i, 'value': f'item_{i}'}

520

yield json.dumps(data).encode('utf-8')

521

522

# Simulate processing delay

523

await asyncio.sleep(0.01)

524

525

yield b']'

526

527

async def stream_json_data():

528

async with aiohttp.ClientSession() as session:

529

# Create streaming payload

530

data = json_stream_generator()

531

532

async with session.post('https://api.example.com/bulk',

533

data=data,

534

headers={'Content-Type': 'application/json'}) as response:

535

return await response.json()

536

```

537

538

### Custom Payload Type

539

540

```python

541

import aiohttp

542

from aiohttp.payload import Payload

543

544

class CSVPayload(Payload):

545

def __init__(self, data, **kwargs):

546

"""Custom CSV payload."""

547

# Convert data to CSV format

548

if isinstance(data, list) and data and isinstance(data[0], dict):

549

# Convert list of dicts to CSV

550

import csv

551

import io

552

553

output = io.StringIO()

554

writer = csv.DictWriter(output, fieldnames=data[0].keys())

555

writer.writeheader()

556

writer.writerows(data)

557

csv_data = output.getvalue().encode('utf-8')

558

else:

559

csv_data = str(data).encode('utf-8')

560

561

super().__init__(csv_data,

562

content_type='text/csv',

563

**kwargs)

564

565

# Register custom payload type

566

@aiohttp.payload_type(list)

567

def list_to_csv_payload(data, *args, **kwargs):

568

"""Convert list to CSV payload."""

569

if data and isinstance(data[0], dict):

570

return CSVPayload(data, *args, **kwargs)

571

return aiohttp.get_payload(data, *args, **kwargs)

572

573

# Usage

574

async def send_csv_data():

575

data = [

576

{'name': 'John', 'age': 30, 'city': 'New York'},

577

{'name': 'Jane', 'age': 25, 'city': 'Boston'},

578

{'name': 'Bob', 'age': 35, 'city': 'Chicago'}

579

]

580

581

async with aiohttp.ClientSession() as session:

582

async with session.post('https://api.example.com/data.csv',

583

data=data) as response:

584

return await response.text()

585

```

586

587

### Advanced File Upload with Progress

588

589

```python

590

import aiohttp

591

import asyncio

592

from pathlib import Path

593

594

class ProgressFilePayload(aiohttp.IOBasePayload):

595

def __init__(self, file_path, progress_callback=None, **kwargs):

596

self._file_path = Path(file_path)

597

self._progress_callback = progress_callback

598

self._total_size = self._file_path.stat().st_size

599

self._uploaded = 0

600

601

super().__init__(

602

open(file_path, 'rb'),

603

filename=self._file_path.name,

604

**kwargs

605

)

606

607

async def write(self, writer):

608

"""Write file with progress tracking."""

609

chunk_size = 8192

610

611

while True:

612

chunk = self._value.read(chunk_size)

613

if not chunk:

614

break

615

616

await writer.write(chunk)

617

self._uploaded += len(chunk)

618

619

if self._progress_callback:

620

progress = (self._uploaded / self._total_size) * 100

621

await self._progress_callback(progress, self._uploaded, self._total_size)

622

623

async def progress_callback(percent, uploaded, total):

624

"""Progress callback function."""

625

print(f"Upload progress: {percent:.1f}% ({uploaded}/{total} bytes)")

626

627

async def upload_large_file():

628

file_path = 'large_file.zip'

629

630

# Create form data with progress tracking

631

form = aiohttp.FormData()

632

form.add_field('description', 'Large file upload')

633

form.add_field('file',

634

ProgressFilePayload(file_path, progress_callback),

635

filename='large_file.zip',

636

content_type='application/zip')

637

638

async with aiohttp.ClientSession() as session:

639

async with session.post('https://api.example.com/upload',

640

data=form) as response:

641

result = await response.json()

642

return result

643

644

# Run upload

645

asyncio.run(upload_large_file())

646

```