Parallel PyData with task scheduling for distributed analytics and computing.
—
Distributed list-like collections for processing semi-structured and unstructured data with functional programming patterns. Dask Bags handle data that doesn't fit the array or DataFrame model, such as JSON records, log files, or any Python objects.
Create Bag collections from various data sources and Python objects.
def from_sequence(seq, partition_size=None, name=None):
"""
Create Bag from a sequence of items.
Parameters:
- seq: Sequence or iterable of items
- partition_size: Number of items per partition
- name: Custom name for bag
Returns:
dask.bag.Bag: Bag collection
"""
def from_delayed(values, name=None):
"""
Create Bag from delayed objects.
Parameters:
- values: List of delayed objects
- name: Custom name for bag
Returns:
dask.bag.Bag: Bag collection
"""
def from_url(urls, **kwargs):
"""
Create Bag from URLs or file patterns.
Parameters:
- urls: URL, file path, or list of paths
- **kwargs: Additional arguments for file reading
Returns:
dask.bag.Bag: Bag with file contents
"""
def range(start, stop=None, step=None, partition_size=None, name=None):
"""
Create Bag from numeric range.
Parameters:
- start: Start value or stop if only one argument
- stop: End value (exclusive)
- step: Step between values
- partition_size: Items per partition
- name: Custom name
Returns:
dask.bag.Bag: Bag with range values
"""
def zip(*bags):
"""
Zip multiple bags together.
Parameters:
- *bags: Bags to zip
Returns:
dask.bag.Bag: Bag of tuples
"""
def concat(bags):
"""
Concatenate multiple bags.
Parameters:
- bags: Iterable of bags to concatenate
Returns:
dask.bag.Bag: Concatenated bag
"""Read and write bags from various file formats.
def read_text(urlpath, encoding='utf-8', errors='strict',
linedelimiter=None, compression=None, blocksize=None,
sample=True, **kwargs):
"""
Read text files into Bag of strings.
Parameters:
- urlpath: File path or pattern
- encoding: Text encoding
- errors: How to handle encoding errors
- linedelimiter: Line separator character
- compression: Compression format ('gzip', 'bz2', etc.)
- blocksize: Size of each partition in bytes
- sample: Whether to sample file for optimization
- **kwargs: Additional storage options
Returns:
dask.bag.Bag: Bag of text lines
"""
def read_avro(urlpath, **kwargs):
"""
Read Avro files into Bag.
Parameters:
- urlpath: File path or pattern
- **kwargs: Additional arguments
Returns:
dask.bag.Bag: Bag of Avro records
"""
def to_textfiles(bag, path, name_function=None, compression=None,
encoding='utf-8', compute=True, **kwargs):
"""
Write Bag to text files.
Parameters:
- bag: Bag to write
- path: Output directory or file pattern
- name_function: Function to generate filenames
- compression: Compression format
- encoding: Text encoding
- compute: Whether to write immediately
- **kwargs: Additional storage options
Returns:
Delayed objects or None if compute=True
"""Main Bag class with functional programming interface.
class Bag:
"""
Distributed list-like collection with functional interface.
Properties:
- npartitions: int - Number of partitions
- name: str - Collection name in task graph
"""
def compute(self, scheduler=None, **kwargs):
"""
Compute bag and return Python list.
Returns:
list: Computed results as Python list
"""
def persist(self, scheduler=None, **kwargs):
"""
Persist bag in memory for reuse.
Returns:
dask.bag.Bag: Persisted bag
"""
def take(self, k, npartitions=1):
"""
Take first k elements.
Parameters:
- k: Number of elements to take
- npartitions: Number of partitions to search
Returns:
list: First k elements
"""
def head(self, k=10, npartitions=1):
"""Alias for take()."""
def __iter__(self):
"""Iterate over bag (triggers computation)."""
def __len__(self):
"""Length of bag (triggers computation)."""Functional programming operations for data transformation.
def map(func, *bags, **kwargs):
"""
Apply function to each element.
Parameters:
- func: Function to apply
- *bags: Bags to map over
- **kwargs: Additional arguments
Returns:
dask.bag.Bag: Bag with transformed elements
"""
def filter(predicate, bag):
"""
Filter elements using predicate function.
Parameters:
- predicate: Function returning True/False
- bag: Bag to filter
Returns:
dask.bag.Bag: Filtered bag
"""
def map_partitions(func, *bags, **kwargs):
"""
Apply function to each partition.
Parameters:
- func: Function that takes and returns iterables
- *bags: Input bags
- **kwargs: Additional arguments to func
Returns:
dask.bag.Bag: Transformed bag
"""
def flatten(bag, nlevels=1):
"""
Flatten nested sequences.
Parameters:
- bag: Bag with nested sequences
- nlevels: Number of levels to flatten
Returns:
dask.bag.Bag: Flattened bag
"""
def pluck(key, bag, default=None):
"""
Select values from dictionaries/objects.
Parameters:
- key: Key or attribute name to pluck
- bag: Bag of dictionaries/objects
- default: Default value if key missing
Returns:
dask.bag.Bag: Bag of plucked values
"""
def distinct(bag, key=None):
"""
Remove duplicate elements.
Parameters:
- bag: Input bag
- key: Function to compute comparison key
Returns:
dask.bag.Bag: Bag with unique elements
"""
def frequencies(bag, sort=False, normalize=False, split_every=None):
"""
Count frequency of each element.
Parameters:
- bag: Input bag
- sort: Sort results by frequency
- normalize: Return proportions instead of counts
- split_every: Tree reduction factor
Returns:
dask.bag.Bag: Bag of (item, count) pairs
"""
def topk(bag, k, key=None, split_every=None):
"""
Find k largest elements.
Parameters:
- bag: Input bag
- k: Number of top elements
- key: Function to compute comparison key
- split_every: Tree reduction factor
Returns:
dask.bag.Bag: Bag of top k elements
"""Aggregate operations that combine all elements into single values.
def fold(binop, bag, initial=None, combine=None, split_every=None):
"""
Fold bag using binary operation.
Parameters:
- binop: Binary function for folding
- bag: Input bag
- initial: Initial value for fold
- combine: Function for combining intermediate results
- split_every: Tree reduction factor
Returns:
Delayed object with folded result
"""
def reduce(func, bag, initial=None, split_every=None):
"""
Reduce bag using function.
Parameters:
- func: Reduction function
- bag: Input bag
- initial: Initial value
- split_every: Tree reduction factor
Returns:
Delayed object with reduced result
"""
def sum(bag, split_every=None):
"""Sum all elements."""
def count(bag, split_every=None):
"""Count number of elements."""
def min(bag, split_every=None):
"""Find minimum element."""
def max(bag, split_every=None):
"""Find maximum element."""
def mean(bag, split_every=None):
"""Compute mean of numeric elements."""
def std(bag, split_every=None):
"""Compute standard deviation."""
def var(bag, split_every=None):
"""Compute variance."""Group elements by key for further processing.
def groupby(bag, key, npartitions=None, partition_size=None):
"""
Group elements by key function.
Parameters:
- bag: Input bag
- key: Function to compute grouping key
- npartitions: Number of output partitions
- partition_size: Target partition size
Returns:
dask.bag.Bag: Bag of (key, group) pairs
"""
def foldby(key, binop, bag, initial=None, combine=None, combine_initial=None):
"""
Fold values by key.
Parameters:
- key: Function to compute grouping key
- binop: Binary operation for folding
- bag: Input bag
- initial: Initial value for each group
- combine: Function for combining results
- combine_initial: Initial value for combining
Returns:
dask.bag.Bag: Bag of (key, folded_value) pairs
"""Convert bags to other collection types.
def to_dataframe(bag, columns=None, meta=None):
"""
Convert bag to Dask DataFrame.
Parameters:
- bag: Input bag of records/dictionaries
- columns: Column names for DataFrame
- meta: Metadata DataFrame
Returns:
dask.dataframe.DataFrame: Converted DataFrame
"""
def to_delayed(bag, optimize_graph=True):
"""
Convert bag partitions to delayed objects.
Parameters:
- bag: Input bag
- optimize_graph: Whether to optimize task graph
Returns:
list: List of delayed objects
"""Access individual items from bags.
class Item:
"""
Single item reference from a bag.
Used for accessing specific elements by index.
"""
def compute(self, **kwargs):
"""Compute and return the item value."""
def key(self):
"""Get the task key for this item."""import dask.bag as db
# Read text files
lines = db.read_text('logs/*.txt')
# Process lines
words = (lines.str.strip()
.str.split()
.flatten())
# Count word frequencies
word_counts = words.frequencies(sort=True)
top_words = word_counts.take(10)
print(top_words)import dask.bag as db
import json
# Read JSON files
records = db.read_text('data/*.json').map(json.loads)
# Extract and process fields
user_ages = records.pluck('age').filter(lambda x: x is not None)
avg_age = user_ages.mean().compute()
# Group by category
by_category = records.groupby(lambda x: x.get('category', 'unknown'))
category_counts = by_category.map(lambda x: (x[0], len(x[1]))).compute()import dask.bag as db
def clean_record(record):
"""Clean and validate a data record."""
if record.get('value', 0) > 0:
return {
'id': record['id'],
'value': float(record['value']),
'category': record.get('category', 'default')
}
return None
def aggregate_by_category(group):
"""Aggregate records in a group."""
key, records = group
values = [r['value'] for r in records if r is not None]
return {
'category': key,
'count': len(values),
'sum': sum(values),
'avg': sum(values) / len(values) if values else 0
}
# Process data pipeline
raw_data = db.from_sequence(data_source, partition_size=1000)
cleaned = raw_data.map(clean_record).filter(lambda x: x is not None)
grouped = cleaned.groupby(lambda x: x['category'])
aggregated = grouped.map(aggregate_by_category)
results = aggregated.compute()import dask.bag as db
import re
def extract_features(text_file_content):
"""Extract features from text content."""
lines = text_file_content.split('\n')
return {
'line_count': len(lines),
'word_count': len(text_file_content.split()),
'char_count': len(text_file_content),
'email_count': len(re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text_file_content))
}
# Process multiple files in parallel
files = db.read_text('documents/*.txt', blocksize=None) # One file per partition
features = files.map(extract_features)
summary = features.compute()
print(f"Processed {len(summary)} files")import dask.bag as db
import dask.dataframe as dd
# Start with bag of records
records = db.from_sequence(json_records, partition_size=10000)
# Convert to DataFrame for structured analysis
df = records.to_dataframe()
# Process with DataFrame operations
summary = df.groupby('category').value.agg(['mean', 'count', 'sum'])
# Back to bag for further processing
result_records = df.to_bag(format='dict')
final_result = result_records.take(100)Install with Tessl CLI
npx tessl i tessl/pypi-dask