CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask

Parallel PyData with task scheduling for distributed analytics and computing.

Pending
Overview
Eval results
Files

bags.mddocs/

Bags

Distributed list-like collections for processing semi-structured and unstructured data with functional programming patterns. Dask Bags handle data that doesn't fit the array or DataFrame model, such as JSON records, log files, or any Python objects.

Capabilities

Bag Creation

Create Bag collections from various data sources and Python objects.

def from_sequence(seq, partition_size=None, name=None):
    """
    Create Bag from a sequence of items.
    
    Parameters:
    - seq: Sequence or iterable of items
    - partition_size: Number of items per partition
    - name: Custom name for bag
    
    Returns:
    dask.bag.Bag: Bag collection
    """

def from_delayed(values, name=None):
    """
    Create Bag from delayed objects.
    
    Parameters:
    - values: List of delayed objects
    - name: Custom name for bag
    
    Returns:
    dask.bag.Bag: Bag collection
    """

def from_url(urls, **kwargs):
    """
    Create Bag from URLs or file patterns.
    
    Parameters:
    - urls: URL, file path, or list of paths
    - **kwargs: Additional arguments for file reading
    
    Returns:
    dask.bag.Bag: Bag with file contents
    """

def range(start, stop=None, step=None, partition_size=None, name=None):
    """
    Create Bag from numeric range.
    
    Parameters:
    - start: Start value or stop if only one argument
    - stop: End value (exclusive)
    - step: Step between values
    - partition_size: Items per partition
    - name: Custom name
    
    Returns:
    dask.bag.Bag: Bag with range values
    """

def zip(*bags):
    """
    Zip multiple bags together.
    
    Parameters:
    - *bags: Bags to zip
    
    Returns:
    dask.bag.Bag: Bag of tuples
    """

def concat(bags):
    """
    Concatenate multiple bags.
    
    Parameters:
    - bags: Iterable of bags to concatenate
    
    Returns:
    dask.bag.Bag: Concatenated bag
    """

File I/O Operations

Read and write bags from various file formats.

def read_text(urlpath, encoding='utf-8', errors='strict', 
             linedelimiter=None, compression=None, blocksize=None,
             sample=True, **kwargs):
    """
    Read text files into Bag of strings.
    
    Parameters:
    - urlpath: File path or pattern
    - encoding: Text encoding
    - errors: How to handle encoding errors
    - linedelimiter: Line separator character
    - compression: Compression format ('gzip', 'bz2', etc.)
    - blocksize: Size of each partition in bytes
    - sample: Whether to sample file for optimization
    - **kwargs: Additional storage options
    
    Returns:
    dask.bag.Bag: Bag of text lines
    """

def read_avro(urlpath, **kwargs):
    """
    Read Avro files into Bag.
    
    Parameters:
    - urlpath: File path or pattern
    - **kwargs: Additional arguments
    
    Returns:
    dask.bag.Bag: Bag of Avro records
    """

def to_textfiles(bag, path, name_function=None, compression=None,
                encoding='utf-8', compute=True, **kwargs):
    """
    Write Bag to text files.
    
    Parameters:
    - bag: Bag to write
    - path: Output directory or file pattern
    - name_function: Function to generate filenames
    - compression: Compression format
    - encoding: Text encoding
    - compute: Whether to write immediately
    - **kwargs: Additional storage options
    
    Returns:
    Delayed objects or None if compute=True
    """

Core Bag Class

Main Bag class with functional programming interface.

class Bag:
    """
    Distributed list-like collection with functional interface.
    
    Properties:
    - npartitions: int - Number of partitions
    - name: str - Collection name in task graph
    """
    
    def compute(self, scheduler=None, **kwargs):
        """
        Compute bag and return Python list.
        
        Returns:
        list: Computed results as Python list
        """
    
    def persist(self, scheduler=None, **kwargs):
        """
        Persist bag in memory for reuse.
        
        Returns:
        dask.bag.Bag: Persisted bag
        """
    
    def take(self, k, npartitions=1):
        """
        Take first k elements.
        
        Parameters:
        - k: Number of elements to take
        - npartitions: Number of partitions to search
        
        Returns:
        list: First k elements
        """
    
    def head(self, k=10, npartitions=1):
        """Alias for take()."""
    
    def __iter__(self):
        """Iterate over bag (triggers computation)."""
    
    def __len__(self):
        """Length of bag (triggers computation)."""

Transformation Operations

Functional programming operations for data transformation.

def map(func, *bags, **kwargs):
    """
    Apply function to each element.
    
    Parameters:
    - func: Function to apply
    - *bags: Bags to map over
    - **kwargs: Additional arguments
    
    Returns:
    dask.bag.Bag: Bag with transformed elements
    """

def filter(predicate, bag):
    """
    Filter elements using predicate function.
    
    Parameters:
    - predicate: Function returning True/False
    - bag: Bag to filter
    
    Returns:
    dask.bag.Bag: Filtered bag
    """

def map_partitions(func, *bags, **kwargs):
    """
    Apply function to each partition.
    
    Parameters:
    - func: Function that takes and returns iterables
    - *bags: Input bags
    - **kwargs: Additional arguments to func
    
    Returns:
    dask.bag.Bag: Transformed bag
    """

def flatten(bag, nlevels=1):
    """
    Flatten nested sequences.
    
    Parameters:
    - bag: Bag with nested sequences
    - nlevels: Number of levels to flatten
    
    Returns:
    dask.bag.Bag: Flattened bag
    """

def pluck(key, bag, default=None):
    """
    Select values from dictionaries/objects.
    
    Parameters:
    - key: Key or attribute name to pluck
    - bag: Bag of dictionaries/objects
    - default: Default value if key missing
    
    Returns:
    dask.bag.Bag: Bag of plucked values
    """

def distinct(bag, key=None):
    """
    Remove duplicate elements.
    
    Parameters:
    - bag: Input bag
    - key: Function to compute comparison key
    
    Returns:
    dask.bag.Bag: Bag with unique elements
    """

def frequencies(bag, sort=False, normalize=False, split_every=None):
    """
    Count frequency of each element.
    
    Parameters:
    - bag: Input bag
    - sort: Sort results by frequency
    - normalize: Return proportions instead of counts
    - split_every: Tree reduction factor
    
    Returns:
    dask.bag.Bag: Bag of (item, count) pairs
    """

def topk(bag, k, key=None, split_every=None):
    """
    Find k largest elements.
    
    Parameters:
    - bag: Input bag
    - k: Number of top elements
    - key: Function to compute comparison key
    - split_every: Tree reduction factor
    
    Returns:
    dask.bag.Bag: Bag of top k elements
    """

Reduction Operations

Aggregate operations that combine all elements into single values.

def fold(binop, bag, initial=None, combine=None, split_every=None):
    """
    Fold bag using binary operation.
    
    Parameters:
    - binop: Binary function for folding
    - bag: Input bag
    - initial: Initial value for fold
    - combine: Function for combining intermediate results
    - split_every: Tree reduction factor
    
    Returns:
    Delayed object with folded result
    """

def reduce(func, bag, initial=None, split_every=None):
    """
    Reduce bag using function.
    
    Parameters:
    - func: Reduction function
    - bag: Input bag
    - initial: Initial value
    - split_every: Tree reduction factor
    
    Returns:
    Delayed object with reduced result
    """

def sum(bag, split_every=None):
    """Sum all elements."""

def count(bag, split_every=None):
    """Count number of elements."""

def min(bag, split_every=None):
    """Find minimum element."""

def max(bag, split_every=None):
    """Find maximum element."""

def mean(bag, split_every=None):
    """Compute mean of numeric elements."""

def std(bag, split_every=None):
    """Compute standard deviation."""

def var(bag, split_every=None):
    """Compute variance."""

Grouping Operations

Group elements by key for further processing.

def groupby(bag, key, npartitions=None, partition_size=None):
    """
    Group elements by key function.
    
    Parameters:
    - bag: Input bag
    - key: Function to compute grouping key
    - npartitions: Number of output partitions
    - partition_size: Target partition size
    
    Returns:
    dask.bag.Bag: Bag of (key, group) pairs
    """

def foldby(key, binop, bag, initial=None, combine=None, combine_initial=None):
    """
    Fold values by key.
    
    Parameters:
    - key: Function to compute grouping key
    - binop: Binary operation for folding
    - bag: Input bag
    - initial: Initial value for each group
    - combine: Function for combining results
    - combine_initial: Initial value for combining
    
    Returns:
    dask.bag.Bag: Bag of (key, folded_value) pairs
    """

Conversion Operations

Convert bags to other collection types.

def to_dataframe(bag, columns=None, meta=None):
    """
    Convert bag to Dask DataFrame.
    
    Parameters:
    - bag: Input bag of records/dictionaries
    - columns: Column names for DataFrame
    - meta: Metadata DataFrame
    
    Returns:
    dask.dataframe.DataFrame: Converted DataFrame
    """

def to_delayed(bag, optimize_graph=True):
    """
    Convert bag partitions to delayed objects.
    
    Parameters:
    - bag: Input bag
    - optimize_graph: Whether to optimize task graph
    
    Returns:
    list: List of delayed objects
    """

Item Access

Access individual items from bags.

class Item:
    """
    Single item reference from a bag.
    
    Used for accessing specific elements by index.
    """
    
    def compute(self, **kwargs):
        """Compute and return the item value."""
    
    def key(self):
        """Get the task key for this item."""

Usage Examples

Basic Text Processing

import dask.bag as db

# Read text files
lines = db.read_text('logs/*.txt')

# Process lines
words = (lines.str.strip()
             .str.split()
             .flatten())

# Count word frequencies
word_counts = words.frequencies(sort=True)
top_words = word_counts.take(10)

print(top_words)

JSON Data Processing

import dask.bag as db
import json

# Read JSON files
records = db.read_text('data/*.json').map(json.loads)

# Extract and process fields
user_ages = records.pluck('age').filter(lambda x: x is not None)
avg_age = user_ages.mean().compute()

# Group by category
by_category = records.groupby(lambda x: x.get('category', 'unknown'))
category_counts = by_category.map(lambda x: (x[0], len(x[1]))).compute()

Custom Data Processing Pipeline

import dask.bag as db

def clean_record(record):
    """Clean and validate a data record."""
    if record.get('value', 0) > 0:
        return {
            'id': record['id'],
            'value': float(record['value']),
            'category': record.get('category', 'default')
        }
    return None

def aggregate_by_category(group):
    """Aggregate records in a group."""
    key, records = group
    values = [r['value'] for r in records if r is not None]
    return {
        'category': key,
        'count': len(values),
        'sum': sum(values),
        'avg': sum(values) / len(values) if values else 0
    }

# Process data pipeline
raw_data = db.from_sequence(data_source, partition_size=1000)
cleaned = raw_data.map(clean_record).filter(lambda x: x is not None)
grouped = cleaned.groupby(lambda x: x['category'])
aggregated = grouped.map(aggregate_by_category)

results = aggregated.compute()

Parallel File Processing

import dask.bag as db
import re

def extract_features(text_file_content):
    """Extract features from text content."""
    lines = text_file_content.split('\n')
    return {
        'line_count': len(lines),
        'word_count': len(text_file_content.split()),
        'char_count': len(text_file_content),
        'email_count': len(re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text_file_content))
    }

# Process multiple files in parallel
files = db.read_text('documents/*.txt', blocksize=None)  # One file per partition
features = files.map(extract_features)
summary = features.compute()

print(f"Processed {len(summary)} files")

Integration with Other Collections

import dask.bag as db
import dask.dataframe as dd

# Start with bag of records
records = db.from_sequence(json_records, partition_size=10000)

# Convert to DataFrame for structured analysis
df = records.to_dataframe()

# Process with DataFrame operations
summary = df.groupby('category').value.agg(['mean', 'count', 'sum'])

# Back to bag for further processing
result_records = df.to_bag(format='dict')
final_result = result_records.take(100)

Install with Tessl CLI

npx tessl i tessl/pypi-dask

docs

arrays.md

bags.md

configuration.md

core-functions.md

dataframes.md

delayed.md

diagnostics.md

index.md

tile.json