0
# Bags
1
2
Distributed list-like collections for processing semi-structured and unstructured data with functional programming patterns. Dask Bags handle data that doesn't fit the array or DataFrame model, such as JSON records, log files, or any Python objects.
3
4
## Capabilities
5
6
### Bag Creation
7
8
Create Bag collections from various data sources and Python objects.
9
10
```python { .api }
11
def from_sequence(seq, partition_size=None, name=None):
12
"""
13
Create Bag from a sequence of items.
14
15
Parameters:
16
- seq: Sequence or iterable of items
17
- partition_size: Number of items per partition
18
- name: Custom name for bag
19
20
Returns:
21
dask.bag.Bag: Bag collection
22
"""
23
24
def from_delayed(values, name=None):
25
"""
26
Create Bag from delayed objects.
27
28
Parameters:
29
- values: List of delayed objects
30
- name: Custom name for bag
31
32
Returns:
33
dask.bag.Bag: Bag collection
34
"""
35
36
def from_url(urls, **kwargs):
37
"""
38
Create Bag from URLs or file patterns.
39
40
Parameters:
41
- urls: URL, file path, or list of paths
42
- **kwargs: Additional arguments for file reading
43
44
Returns:
45
dask.bag.Bag: Bag with file contents
46
"""
47
48
def range(start, stop=None, step=None, partition_size=None, name=None):
49
"""
50
Create Bag from numeric range.
51
52
Parameters:
53
- start: Start value or stop if only one argument
54
- stop: End value (exclusive)
55
- step: Step between values
56
- partition_size: Items per partition
57
- name: Custom name
58
59
Returns:
60
dask.bag.Bag: Bag with range values
61
"""
62
63
def zip(*bags):
64
"""
65
Zip multiple bags together.
66
67
Parameters:
68
- *bags: Bags to zip
69
70
Returns:
71
dask.bag.Bag: Bag of tuples
72
"""
73
74
def concat(bags):
75
"""
76
Concatenate multiple bags.
77
78
Parameters:
79
- bags: Iterable of bags to concatenate
80
81
Returns:
82
dask.bag.Bag: Concatenated bag
83
"""
84
```
85
86
### File I/O Operations
87
88
Read and write bags from various file formats.
89
90
```python { .api }
91
def read_text(urlpath, encoding='utf-8', errors='strict',
92
linedelimiter=None, compression=None, blocksize=None,
93
sample=True, **kwargs):
94
"""
95
Read text files into Bag of strings.
96
97
Parameters:
98
- urlpath: File path or pattern
99
- encoding: Text encoding
100
- errors: How to handle encoding errors
101
- linedelimiter: Line separator character
102
- compression: Compression format ('gzip', 'bz2', etc.)
103
- blocksize: Size of each partition in bytes
104
- sample: Whether to sample file for optimization
105
- **kwargs: Additional storage options
106
107
Returns:
108
dask.bag.Bag: Bag of text lines
109
"""
110
111
def read_avro(urlpath, **kwargs):
112
"""
113
Read Avro files into Bag.
114
115
Parameters:
116
- urlpath: File path or pattern
117
- **kwargs: Additional arguments
118
119
Returns:
120
dask.bag.Bag: Bag of Avro records
121
"""
122
123
def to_textfiles(bag, path, name_function=None, compression=None,
124
encoding='utf-8', compute=True, **kwargs):
125
"""
126
Write Bag to text files.
127
128
Parameters:
129
- bag: Bag to write
130
- path: Output directory or file pattern
131
- name_function: Function to generate filenames
132
- compression: Compression format
133
- encoding: Text encoding
134
- compute: Whether to write immediately
135
- **kwargs: Additional storage options
136
137
Returns:
138
Delayed objects or None if compute=True
139
"""
140
```
141
142
### Core Bag Class
143
144
Main Bag class with functional programming interface.
145
146
```python { .api }
147
class Bag:
148
"""
149
Distributed list-like collection with functional interface.
150
151
Properties:
152
- npartitions: int - Number of partitions
153
- name: str - Collection name in task graph
154
"""
155
156
def compute(self, scheduler=None, **kwargs):
157
"""
158
Compute bag and return Python list.
159
160
Returns:
161
list: Computed results as Python list
162
"""
163
164
def persist(self, scheduler=None, **kwargs):
165
"""
166
Persist bag in memory for reuse.
167
168
Returns:
169
dask.bag.Bag: Persisted bag
170
"""
171
172
def take(self, k, npartitions=1):
173
"""
174
Take first k elements.
175
176
Parameters:
177
- k: Number of elements to take
178
- npartitions: Number of partitions to search
179
180
Returns:
181
list: First k elements
182
"""
183
184
def head(self, k=10, npartitions=1):
185
"""Alias for take()."""
186
187
def __iter__(self):
188
"""Iterate over bag (triggers computation)."""
189
190
def __len__(self):
191
"""Length of bag (triggers computation)."""
192
```
193
194
### Transformation Operations
195
196
Functional programming operations for data transformation.
197
198
```python { .api }
199
def map(func, *bags, **kwargs):
200
"""
201
Apply function to each element.
202
203
Parameters:
204
- func: Function to apply
205
- *bags: Bags to map over
206
- **kwargs: Additional arguments
207
208
Returns:
209
dask.bag.Bag: Bag with transformed elements
210
"""
211
212
def filter(predicate, bag):
213
"""
214
Filter elements using predicate function.
215
216
Parameters:
217
- predicate: Function returning True/False
218
- bag: Bag to filter
219
220
Returns:
221
dask.bag.Bag: Filtered bag
222
"""
223
224
def map_partitions(func, *bags, **kwargs):
225
"""
226
Apply function to each partition.
227
228
Parameters:
229
- func: Function that takes and returns iterables
230
- *bags: Input bags
231
- **kwargs: Additional arguments to func
232
233
Returns:
234
dask.bag.Bag: Transformed bag
235
"""
236
237
def flatten(bag, nlevels=1):
238
"""
239
Flatten nested sequences.
240
241
Parameters:
242
- bag: Bag with nested sequences
243
- nlevels: Number of levels to flatten
244
245
Returns:
246
dask.bag.Bag: Flattened bag
247
"""
248
249
def pluck(key, bag, default=None):
250
"""
251
Select values from dictionaries/objects.
252
253
Parameters:
254
- key: Key or attribute name to pluck
255
- bag: Bag of dictionaries/objects
256
- default: Default value if key missing
257
258
Returns:
259
dask.bag.Bag: Bag of plucked values
260
"""
261
262
def distinct(bag, key=None):
263
"""
264
Remove duplicate elements.
265
266
Parameters:
267
- bag: Input bag
268
- key: Function to compute comparison key
269
270
Returns:
271
dask.bag.Bag: Bag with unique elements
272
"""
273
274
def frequencies(bag, sort=False, normalize=False, split_every=None):
275
"""
276
Count frequency of each element.
277
278
Parameters:
279
- bag: Input bag
280
- sort: Sort results by frequency
281
- normalize: Return proportions instead of counts
282
- split_every: Tree reduction factor
283
284
Returns:
285
dask.bag.Bag: Bag of (item, count) pairs
286
"""
287
288
def topk(bag, k, key=None, split_every=None):
289
"""
290
Find k largest elements.
291
292
Parameters:
293
- bag: Input bag
294
- k: Number of top elements
295
- key: Function to compute comparison key
296
- split_every: Tree reduction factor
297
298
Returns:
299
dask.bag.Bag: Bag of top k elements
300
"""
301
```
302
303
### Reduction Operations
304
305
Aggregate operations that combine all elements into single values.
306
307
```python { .api }
308
def fold(binop, bag, initial=None, combine=None, split_every=None):
309
"""
310
Fold bag using binary operation.
311
312
Parameters:
313
- binop: Binary function for folding
314
- bag: Input bag
315
- initial: Initial value for fold
316
- combine: Function for combining intermediate results
317
- split_every: Tree reduction factor
318
319
Returns:
320
Delayed object with folded result
321
"""
322
323
def reduce(func, bag, initial=None, split_every=None):
324
"""
325
Reduce bag using function.
326
327
Parameters:
328
- func: Reduction function
329
- bag: Input bag
330
- initial: Initial value
331
- split_every: Tree reduction factor
332
333
Returns:
334
Delayed object with reduced result
335
"""
336
337
def sum(bag, split_every=None):
338
"""Sum all elements."""
339
340
def count(bag, split_every=None):
341
"""Count number of elements."""
342
343
def min(bag, split_every=None):
344
"""Find minimum element."""
345
346
def max(bag, split_every=None):
347
"""Find maximum element."""
348
349
def mean(bag, split_every=None):
350
"""Compute mean of numeric elements."""
351
352
def std(bag, split_every=None):
353
"""Compute standard deviation."""
354
355
def var(bag, split_every=None):
356
"""Compute variance."""
357
```
358
359
### Grouping Operations
360
361
Group elements by key for further processing.
362
363
```python { .api }
364
def groupby(bag, key, npartitions=None, partition_size=None):
365
"""
366
Group elements by key function.
367
368
Parameters:
369
- bag: Input bag
370
- key: Function to compute grouping key
371
- npartitions: Number of output partitions
372
- partition_size: Target partition size
373
374
Returns:
375
dask.bag.Bag: Bag of (key, group) pairs
376
"""
377
378
def foldby(key, binop, bag, initial=None, combine=None, combine_initial=None):
379
"""
380
Fold values by key.
381
382
Parameters:
383
- key: Function to compute grouping key
384
- binop: Binary operation for folding
385
- bag: Input bag
386
- initial: Initial value for each group
387
- combine: Function for combining results
388
- combine_initial: Initial value for combining
389
390
Returns:
391
dask.bag.Bag: Bag of (key, folded_value) pairs
392
"""
393
```
394
395
### Conversion Operations
396
397
Convert bags to other collection types.
398
399
```python { .api }
400
def to_dataframe(bag, columns=None, meta=None):
401
"""
402
Convert bag to Dask DataFrame.
403
404
Parameters:
405
- bag: Input bag of records/dictionaries
406
- columns: Column names for DataFrame
407
- meta: Metadata DataFrame
408
409
Returns:
410
dask.dataframe.DataFrame: Converted DataFrame
411
"""
412
413
def to_delayed(bag, optimize_graph=True):
414
"""
415
Convert bag partitions to delayed objects.
416
417
Parameters:
418
- bag: Input bag
419
- optimize_graph: Whether to optimize task graph
420
421
Returns:
422
list: List of delayed objects
423
"""
424
```
425
426
### Item Access
427
428
Access individual items from bags.
429
430
```python { .api }
431
class Item:
432
"""
433
Single item reference from a bag.
434
435
Used for accessing specific elements by index.
436
"""
437
438
def compute(self, **kwargs):
439
"""Compute and return the item value."""
440
441
def key(self):
442
"""Get the task key for this item."""
443
```
444
445
## Usage Examples
446
447
### Basic Text Processing
448
449
```python
450
import dask.bag as db
451
452
# Read text files
453
lines = db.read_text('logs/*.txt')
454
455
# Process lines
456
words = (lines.str.strip()
457
.str.split()
458
.flatten())
459
460
# Count word frequencies
461
word_counts = words.frequencies(sort=True)
462
top_words = word_counts.take(10)
463
464
print(top_words)
465
```
466
467
### JSON Data Processing
468
469
```python
470
import dask.bag as db
471
import json
472
473
# Read JSON files
474
records = db.read_text('data/*.json').map(json.loads)
475
476
# Extract and process fields
477
user_ages = records.pluck('age').filter(lambda x: x is not None)
478
avg_age = user_ages.mean().compute()
479
480
# Group by category
481
by_category = records.groupby(lambda x: x.get('category', 'unknown'))
482
category_counts = by_category.map(lambda x: (x[0], len(x[1]))).compute()
483
```
484
485
### Custom Data Processing Pipeline
486
487
```python
488
import dask.bag as db
489
490
def clean_record(record):
491
"""Clean and validate a data record."""
492
if record.get('value', 0) > 0:
493
return {
494
'id': record['id'],
495
'value': float(record['value']),
496
'category': record.get('category', 'default')
497
}
498
return None
499
500
def aggregate_by_category(group):
501
"""Aggregate records in a group."""
502
key, records = group
503
values = [r['value'] for r in records if r is not None]
504
return {
505
'category': key,
506
'count': len(values),
507
'sum': sum(values),
508
'avg': sum(values) / len(values) if values else 0
509
}
510
511
# Process data pipeline
512
raw_data = db.from_sequence(data_source, partition_size=1000)
513
cleaned = raw_data.map(clean_record).filter(lambda x: x is not None)
514
grouped = cleaned.groupby(lambda x: x['category'])
515
aggregated = grouped.map(aggregate_by_category)
516
517
results = aggregated.compute()
518
```
519
520
### Parallel File Processing
521
522
```python
523
import dask.bag as db
524
import re
525
526
def extract_features(text_file_content):
527
"""Extract features from text content."""
528
lines = text_file_content.split('\n')
529
return {
530
'line_count': len(lines),
531
'word_count': len(text_file_content.split()),
532
'char_count': len(text_file_content),
533
'email_count': len(re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text_file_content))
534
}
535
536
# Process multiple files in parallel
537
files = db.read_text('documents/*.txt', blocksize=None) # One file per partition
538
features = files.map(extract_features)
539
summary = features.compute()
540
541
print(f"Processed {len(summary)} files")
542
```
543
544
### Integration with Other Collections
545
546
```python
547
import dask.bag as db
548
import dask.dataframe as dd
549
550
# Start with bag of records
551
records = db.from_sequence(json_records, partition_size=10000)
552
553
# Convert to DataFrame for structured analysis
554
df = records.to_dataframe()
555
556
# Process with DataFrame operations
557
summary = df.groupby('category').value.agg(['mean', 'count', 'sum'])
558
559
# Back to bag for further processing
560
result_records = df.to_bag(format='dict')
561
final_result = result_records.take(100)
562
```