0
# Data Handling
1
2
Comprehensive data processing and payload system supporting various data types, multipart content, form data processing, and streaming operations. Includes automatic content encoding/decoding and extensive customization options for different data formats.
3
4
## Capabilities
5
6
### Form Data Processing
7
8
Form data creation and processing for HTML forms and file uploads with support for multiple encoding types.
9
10
```python { .api }
11
class FormData:
12
def __init__(self, quote_fields=True, charset='utf-8'):
13
"""
14
Create form data container.
15
16
Parameters:
17
- quote_fields (bool): Quote field names and values
18
- charset (str): Text encoding
19
"""
20
21
def add_field(
22
self,
23
name,
24
value,
25
*,
26
content_type=None,
27
filename=None,
28
content_transfer_encoding=None
29
):
30
"""
31
Add form field.
32
33
Parameters:
34
- name (str): Field name
35
- value: Field value (str, bytes, or file-like)
36
- content_type (str): Content type for field
37
- filename (str): Filename for file fields
38
- content_transfer_encoding (str): Transfer encoding
39
"""
40
41
def add_fields(self, *fields):
42
"""Add multiple fields from iterable."""
43
44
def is_multipart(self):
45
"""Check if form requires multipart encoding."""
46
```
47
48
### Multipart Content Processing
49
50
Reading and writing multipart content for file uploads, mixed content types, and complex data structures.
51
52
```python { .api }
53
class MultipartReader:
54
def __init__(self, headers, content):
55
"""
56
Create multipart content reader.
57
58
Parameters:
59
- headers: Request headers
60
- content: Content stream
61
"""
62
63
async def next(self):
64
"""
65
Get next part from multipart content.
66
67
Returns:
68
BodyPartReader or None: Next body part or None if finished
69
"""
70
71
async def release(self):
72
"""Release reader resources."""
73
74
def at_eof(self):
75
"""Check if at end of content."""
76
77
class BodyPartReader:
78
@property
79
def headers(self):
80
"""Part headers."""
81
82
@property
83
def name(self):
84
"""Part name from Content-Disposition."""
85
86
@property
87
def filename(self):
88
"""Part filename from Content-Disposition."""
89
90
async def read(self, decode=False):
91
"""
92
Read part content.
93
94
Parameters:
95
- decode (bool): Decode content based on headers
96
97
Returns:
98
bytes: Part content
99
"""
100
101
async def read_chunk(self, size=8192):
102
"""Read content chunk."""
103
104
async def text(self, encoding=None):
105
"""Read part content as text."""
106
107
async def json(self, encoding=None):
108
"""Parse part content as JSON."""
109
110
async def release(self):
111
"""Release part resources."""
112
113
class MultipartWriter:
114
def __init__(self, subtype='mixed', boundary=None):
115
"""
116
Create multipart content writer.
117
118
Parameters:
119
- subtype (str): Multipart subtype
120
- boundary (str): Multipart boundary
121
"""
122
123
def append(self, obj, headers=None):
124
"""
125
Append object to multipart content.
126
127
Parameters:
128
- obj: Object to append (str, bytes, file-like, or payload)
129
- headers (dict): Part headers
130
131
Returns:
132
Payload: Created payload for the part
133
"""
134
135
def append_json(self, obj, headers=None):
136
"""Append JSON object to multipart content."""
137
138
def append_form(self, obj, headers=None):
139
"""Append form data to multipart content."""
140
141
@property
142
def boundary(self):
143
"""Multipart boundary."""
144
145
@property
146
def size(self):
147
"""Content size if known."""
148
149
def content_disposition_filename(name, fallback=None):
150
"""
151
Extract filename from Content-Disposition header value.
152
153
Parameters:
154
- name (str): Content-Disposition header value
155
- fallback (str): Fallback filename
156
157
Returns:
158
str or None: Extracted filename
159
"""
160
161
def parse_content_disposition(header):
162
"""
163
Parse Content-Disposition header.
164
165
Parameters:
166
- header (str): Content-Disposition header value
167
168
Returns:
169
tuple: (disposition_type, parameters_dict)
170
"""
171
172
class BadContentDispositionHeader(Exception):
173
"""Invalid Content-Disposition header."""
174
175
class BadContentDispositionParam(Exception):
176
"""Invalid Content-Disposition parameter."""
177
```
178
179
### Payload System
180
181
Flexible payload system supporting various data types with automatic serialization and content type detection.
182
183
```python { .api }
184
class Payload:
185
def __init__(self, value, headers=None, content_type=None, filename=None, encoding=None):
186
"""
187
Base payload class.
188
189
Parameters:
190
- value: Payload data
191
- headers (dict): Payload headers
192
- content_type (str): Content type
193
- filename (str): Filename for file payloads
194
- encoding (str): Text encoding
195
"""
196
197
@property
198
def size(self):
199
"""Payload size if known."""
200
201
@property
202
def headers(self):
203
"""Payload headers."""
204
205
@property
206
def content_type(self):
207
"""Payload content type."""
208
209
async def write(self, writer):
210
"""Write payload to stream writer."""
211
212
class BytesPayload(Payload):
213
"""Payload for bytes data."""
214
215
class StringPayload(Payload):
216
"""Payload for string data."""
217
218
class IOBasePayload(Payload):
219
"""Payload for file-like objects."""
220
221
class BufferedReaderPayload(IOBasePayload):
222
"""Payload for buffered readers."""
223
224
class TextIOPayload(IOBasePayload):
225
"""Payload for text I/O objects."""
226
227
class StringIOPayload(TextIOPayload):
228
"""Payload for StringIO objects."""
229
230
class BytesIOPayload(IOBasePayload):
231
"""Payload for BytesIO objects."""
232
233
class JsonPayload(Payload):
234
def __init__(
235
self,
236
value,
237
encoding='utf-8',
238
content_type='application/json',
239
dumps=None,
240
**kwargs
241
):
242
"""
243
JSON payload.
244
245
Parameters:
246
- value: Object to serialize as JSON
247
- encoding (str): Text encoding
248
- content_type (str): Content type
249
- dumps: JSON serialization function
250
"""
251
252
class AsyncIterablePayload(Payload):
253
def __init__(self, value, **kwargs):
254
"""
255
Payload for async iterables.
256
257
Parameters:
258
- value: Async iterable data source
259
"""
260
261
def get_payload(data, *args, **kwargs):
262
"""
263
Get appropriate payload for data.
264
265
Parameters:
266
- data: Data to create payload for
267
268
Returns:
269
Payload: Appropriate payload instance
270
"""
271
272
def payload_type(payload_class):
273
"""
274
Register payload type for specific data types.
275
276
Parameters:
277
- payload_class: Payload class to register
278
279
Returns:
280
Function: Registration decorator
281
"""
282
283
# Global payload registry
284
PAYLOAD_REGISTRY = None # PayloadRegistry instance
285
```
286
287
### Streaming Support
288
289
Streaming payload creation and data streaming utilities.
290
291
```python { .api }
292
def streamer(func):
293
"""
294
Decorator to create streaming payload from generator function.
295
296
Parameters:
297
- func: Async generator function
298
299
Returns:
300
Function: Payload factory function
301
"""
302
```
303
304
### Cookie Handling
305
306
Cookie management and storage implementations.
307
308
```python { .api }
309
class CookieJar:
310
def __init__(self, *, unsafe=False, quote_cookie=True, treat_as_secure_origin=None):
311
"""
312
Default cookie jar implementation.
313
314
Parameters:
315
- unsafe (bool): Allow unsafe cookies
316
- quote_cookie (bool): Quote cookie values
317
- treat_as_secure_origin: Origins to treat as secure
318
"""
319
320
def update_cookies(self, cookies, response_url=None):
321
"""Update cookies from response."""
322
323
def filter_cookies(self, request_url=None):
324
"""Filter cookies for request."""
325
326
def clear(self, predicate=None):
327
"""Clear cookies matching predicate."""
328
329
def clear_domain(self, domain):
330
"""Clear cookies for domain."""
331
332
def __iter__(self):
333
"""Iterate over cookies."""
334
335
def __len__(self):
336
"""Number of stored cookies."""
337
338
class DummyCookieJar:
339
"""No-op cookie jar that stores no cookies."""
340
341
def update_cookies(self, cookies, response_url=None):
342
"""No-op cookie update."""
343
344
def filter_cookies(self, request_url=None):
345
"""Return empty cookie collection."""
346
```
347
348
### Stream Processing
349
350
Low-level stream readers and data queues for advanced data processing.
351
352
```python { .api }
353
class StreamReader:
354
def __init__(self, protocol, limit=2**16, loop=None):
355
"""
356
Asynchronous stream reader.
357
358
Parameters:
359
- protocol: Stream protocol
360
- limit (int): Buffer size limit
361
- loop: Event loop
362
"""
363
364
async def read(self, n=-1):
365
"""
366
Read up to n bytes.
367
368
Parameters:
369
- n (int): Number of bytes to read (-1 for all)
370
371
Returns:
372
bytes: Read data
373
"""
374
375
async def readline(self):
376
"""Read one line."""
377
378
async def readexactly(self, n):
379
"""Read exactly n bytes."""
380
381
async def readuntil(self, separator=b'\n'):
382
"""Read until separator."""
383
384
def at_eof(self):
385
"""Check if at end of stream."""
386
387
def feed_eof(self):
388
"""Signal end of stream."""
389
390
def feed_data(self, data):
391
"""Feed data to stream."""
392
393
class DataQueue:
394
def __init__(self, *, loop=None):
395
"""Generic data queue."""
396
397
def __aiter__(self):
398
"""Async iterator over queue items."""
399
400
async def read(self):
401
"""Read next item from queue."""
402
403
def feed_data(self, data, size=0):
404
"""Add data to queue."""
405
406
def feed_eof(self):
407
"""Signal end of queue."""
408
409
def is_eof(self):
410
"""Check if queue is at EOF."""
411
412
def at_eof(self):
413
"""Check if queue is finished."""
414
415
class FlowControlDataQueue(DataQueue):
416
def __init__(self, protocol, limit=2**16, *, loop=None):
417
"""
418
Flow-controlled data queue.
419
420
Parameters:
421
- protocol: Flow control protocol
422
- limit (int): Buffer size limit
423
- loop: Event loop
424
"""
425
426
class EofStream(Exception):
427
"""End of stream exception."""
428
429
# Empty payload singleton
430
EMPTY_PAYLOAD = None
431
```
432
433
## Usage Examples
434
435
### Form Data Submission
436
437
```python
438
import aiohttp
439
440
async def submit_form():
441
# Create form data
442
form = aiohttp.FormData()
443
form.add_field('name', 'John Doe')
444
form.add_field('email', 'john@example.com')
445
form.add_field('age', '30')
446
447
# Add file upload
448
with open('document.pdf', 'rb') as f:
449
form.add_field('file', f,
450
filename='document.pdf',
451
content_type='application/pdf')
452
453
# Submit form
454
async with aiohttp.ClientSession() as session:
455
async with session.post('https://api.example.com/submit',
456
data=form) as response:
457
result = await response.json()
458
return result
459
```
460
461
### Multipart Content Processing
462
463
```python
464
from aiohttp import web
465
466
async def upload_handler(request):
467
reader = await request.multipart()
468
469
uploaded_files = []
470
form_fields = {}
471
472
async for part in reader:
473
if part.filename:
474
# Handle file upload
475
filename = part.filename
476
content = await part.read()
477
478
# Save file
479
with open(f'uploads/{filename}', 'wb') as f:
480
f.write(content)
481
482
uploaded_files.append({
483
'filename': filename,
484
'size': len(content),
485
'content_type': part.headers.get('Content-Type')
486
})
487
488
else:
489
# Handle form field
490
field_name = part.headers['Content-Disposition'].split('name="')[1].split('"')[0]
491
field_value = await part.text()
492
form_fields[field_name] = field_value
493
494
return web.json_response({
495
'files': uploaded_files,
496
'fields': form_fields
497
})
498
499
app = web.Application()
500
app.router.add_post('/upload', upload_handler)
501
```
502
503
### JSON Payload Streaming
504
505
```python
506
import aiohttp
507
import asyncio
508
import json
509
510
@aiohttp.streamer
511
async def json_stream_generator():
512
"""Generate streaming JSON data."""
513
yield b'['
514
515
for i in range(1000):
516
if i > 0:
517
yield b','
518
519
data = {'id': i, 'value': f'item_{i}'}
520
yield json.dumps(data).encode('utf-8')
521
522
# Simulate processing delay
523
await asyncio.sleep(0.01)
524
525
yield b']'
526
527
async def stream_json_data():
528
async with aiohttp.ClientSession() as session:
529
# Create streaming payload
530
data = json_stream_generator()
531
532
async with session.post('https://api.example.com/bulk',
533
data=data,
534
headers={'Content-Type': 'application/json'}) as response:
535
return await response.json()
536
```
537
538
### Custom Payload Type
539
540
```python
541
import aiohttp
542
from aiohttp.payload import Payload
543
544
class CSVPayload(Payload):
545
def __init__(self, data, **kwargs):
546
"""Custom CSV payload."""
547
# Convert data to CSV format
548
if isinstance(data, list) and data and isinstance(data[0], dict):
549
# Convert list of dicts to CSV
550
import csv
551
import io
552
553
output = io.StringIO()
554
writer = csv.DictWriter(output, fieldnames=data[0].keys())
555
writer.writeheader()
556
writer.writerows(data)
557
csv_data = output.getvalue().encode('utf-8')
558
else:
559
csv_data = str(data).encode('utf-8')
560
561
super().__init__(csv_data,
562
content_type='text/csv',
563
**kwargs)
564
565
# Register custom payload type
566
@aiohttp.payload_type(list)
567
def list_to_csv_payload(data, *args, **kwargs):
568
"""Convert list to CSV payload."""
569
if data and isinstance(data[0], dict):
570
return CSVPayload(data, *args, **kwargs)
571
return aiohttp.get_payload(data, *args, **kwargs)
572
573
# Usage
574
async def send_csv_data():
575
data = [
576
{'name': 'John', 'age': 30, 'city': 'New York'},
577
{'name': 'Jane', 'age': 25, 'city': 'Boston'},
578
{'name': 'Bob', 'age': 35, 'city': 'Chicago'}
579
]
580
581
async with aiohttp.ClientSession() as session:
582
async with session.post('https://api.example.com/data.csv',
583
data=data) as response:
584
return await response.text()
585
```
586
587
### Advanced File Upload with Progress
588
589
```python
590
import aiohttp
591
import asyncio
592
from pathlib import Path
593
594
class ProgressFilePayload(aiohttp.IOBasePayload):
595
def __init__(self, file_path, progress_callback=None, **kwargs):
596
self._file_path = Path(file_path)
597
self._progress_callback = progress_callback
598
self._total_size = self._file_path.stat().st_size
599
self._uploaded = 0
600
601
super().__init__(
602
open(file_path, 'rb'),
603
filename=self._file_path.name,
604
**kwargs
605
)
606
607
async def write(self, writer):
608
"""Write file with progress tracking."""
609
chunk_size = 8192
610
611
while True:
612
chunk = self._value.read(chunk_size)
613
if not chunk:
614
break
615
616
await writer.write(chunk)
617
self._uploaded += len(chunk)
618
619
if self._progress_callback:
620
progress = (self._uploaded / self._total_size) * 100
621
await self._progress_callback(progress, self._uploaded, self._total_size)
622
623
async def progress_callback(percent, uploaded, total):
624
"""Progress callback function."""
625
print(f"Upload progress: {percent:.1f}% ({uploaded}/{total} bytes)")
626
627
async def upload_large_file():
628
file_path = 'large_file.zip'
629
630
# Create form data with progress tracking
631
form = aiohttp.FormData()
632
form.add_field('description', 'Large file upload')
633
form.add_field('file',
634
ProgressFilePayload(file_path, progress_callback),
635
filename='large_file.zip',
636
content_type='application/zip')
637
638
async with aiohttp.ClientSession() as session:
639
async with session.post('https://api.example.com/upload',
640
data=form) as response:
641
result = await response.json()
642
return result
643
644
# Run upload
645
asyncio.run(upload_large_file())
646
```