CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dask

Parallel PyData with task scheduling for distributed analytics and computing.

Pending
Overview
Eval results
Files

dataframes.mddocs/

DataFrames

Pandas-compatible distributed DataFrames for larger-than-memory datasets. Dask DataFrames partition data across multiple pandas DataFrames, enabling familiar pandas operations on datasets that don't fit in memory.

Capabilities

DataFrame Creation

Create Dask DataFrames from various sources including files, pandas objects, and other collections.

def from_pandas(df, npartitions=None, chunksize=None, sort=True, name=None):
    """
    Create Dask DataFrame from pandas DataFrame.
    
    Parameters:
    - df: pandas DataFrame or Series
    - npartitions: Number of partitions to create
    - chunksize: Approximate size of each partition
    - sort: Whether to sort by index
    - name: Custom name for collection
    
    Returns:
    dask.dataframe.DataFrame or Series: Dask collection
    """

def from_array(x, columns=None, index=None, meta=None):
    """
    Create DataFrame from dask array.
    
    Parameters:
    - x: Dask array (must be 2D)
    - columns: Column names
    - index: Index for DataFrame
    - meta: Metadata DataFrame
    
    Returns:
    dask.dataframe.DataFrame: Dask DataFrame
    """

def from_dict(data, npartitions, orient='columns', dtype=None, columns=None):
    """
    Create DataFrame from dictionary of sequences.
    
    Parameters:
    - data: Dictionary of array-like values
    - npartitions: Number of partitions
    - orient: Data orientation ('columns' or 'index')
    - dtype: Data type
    - columns: Column names
    
    Returns:
    dask.dataframe.DataFrame: Dask DataFrame
    """

def from_delayed(dfs, meta=None, divisions=None, prefix='from-delayed', 
                verify_meta=True):
    """
    Create DataFrame from delayed objects.
    
    Parameters:
    - dfs: List of delayed pandas DataFrame objects
    - meta: Metadata DataFrame for type inference
    - divisions: Index divisions between partitions
    - prefix: Name prefix for task keys
    - verify_meta: Check metadata consistency
    
    Returns:
    dask.dataframe.DataFrame: Dask DataFrame
    """

File I/O Operations

Read and write DataFrames from various file formats.

def read_csv(urlpath, blocksize=None, collection=True, **kwargs):
    """
    Read CSV files into Dask DataFrame.
    
    Parameters:
    - urlpath: File path or pattern (supports wildcards)
    - blocksize: Size of each partition in bytes
    - collection: Return DataFrame (True) or delayed objects (False)
    - **kwargs: Additional pandas.read_csv arguments
    
    Returns:
    dask.dataframe.DataFrame: Dask DataFrame
    """

def read_parquet(path, columns=None, filters=None, categories=None,
                index=None, storage_options=None, **kwargs):
    """
    Read Parquet files into Dask DataFrame.
    
    Parameters:
    - path: File path or directory
    - columns: Columns to read
    - filters: Row filters to apply
    - categories: Columns to treat as categorical
    - index: Column to use as index
    - storage_options: Storage backend options
    - **kwargs: Additional arguments
    
    Returns:
    dask.dataframe.DataFrame: Dask DataFrame
    """

def read_json(urlpath, orient='records', lines=None, **kwargs):
    """Read JSON files into Dask DataFrame."""

def read_hdf(pattern, key, start=None, stop=None, **kwargs):
    """Read HDF5 files into Dask DataFrame."""

def read_sql_table(table, uri, index_col, divisions=None, 
                  npartitions=None, **kwargs):
    """Read SQL table into Dask DataFrame."""

def read_sql_query(sql, uri, index_col, divisions=None,
                  npartitions=None, **kwargs):
    """Read SQL query results into Dask DataFrame."""

def to_csv(df, filename, **kwargs):
    """Write DataFrame to CSV files."""

def to_parquet(df, path, **kwargs):  
    """Write DataFrame to Parquet format."""

def to_json(df, filename, **kwargs):
    """Write DataFrame to JSON files."""

def to_hdf(df, path, key, **kwargs):
    """Write DataFrame to HDF5 format."""

def to_sql(df, name, uri, **kwargs):
    """Write DataFrame to SQL database."""

Core DataFrame Class

Main DataFrame class with pandas-compatible interface.

class DataFrame:
    """
    Distributed pandas-like DataFrame.
    
    Properties:
    - columns: Index - Column names
    - dtypes: Series - Data types of columns  
    - index: Index - Row index
    - shape: tuple - Approximate shape (nrows, ncols)
    - ndim: int - Number of dimensions (always 2)
    - npartitions: int - Number of partitions
    - divisions: tuple - Index divisions between partitions
    """
    
    def compute(self, scheduler=None, **kwargs):
        """
        Compute DataFrame and return pandas result.
        
        Returns:
        pandas.DataFrame: Computed DataFrame
        """
    
    def persist(self, scheduler=None, **kwargs):
        """
        Persist DataFrame in memory for reuse.
        
        Returns:
        dask.dataframe.DataFrame: Persisted DataFrame
        """
    
    def head(self, n=5, npartitions=1, compute=True):
        """
        Return first n rows.
        
        Parameters:
        - n: Number of rows to return
        - npartitions: Number of partitions to search
        - compute: Whether to compute result
        
        Returns:
        pandas.DataFrame or dask.dataframe.DataFrame: First n rows
        """
    
    def tail(self, n=5, compute=True):
        """Return last n rows."""
    
    def __getitem__(self, key):
        """Column selection and fancy indexing."""
    
    def __setitem__(self, key, value):
        """Column assignment."""
    
    def loc(self):
        """Label-based indexing."""
    
    def iloc(self):
        """Integer position-based indexing."""

Series Class

Distributed Series for single-column operations.

class Series:
    """
    Distributed pandas-like Series.
    
    Properties:
    - dtype: numpy.dtype - Data type
    - index: Index - Row index
    - name: str - Series name
    - shape: tuple - Approximate shape (nrows,)
    - ndim: int - Number of dimensions (always 1)
    - npartitions: int - Number of partitions
    """
    
    def compute(self, scheduler=None, **kwargs):
        """
        Compute Series and return pandas result.
        
        Returns:
        pandas.Series: Computed Series
        """
    
    def persist(self, scheduler=None, **kwargs):
        """
        Persist Series in memory.
        
        Returns:
        dask.dataframe.Series: Persisted Series
        """
    
    def head(self, n=5, npartitions=1, compute=True):
        """Return first n values."""
    
    def tail(self, n=5, compute=True):
        """Return last n values."""
    
    def value_counts(self, normalize=False, sort=True, ascending=False,
                    split_every=None):
        """Count unique values."""
    
    def unique(self, split_every=None):
        """Return unique values."""

DataFrame Operations

Data manipulation and transformation functions.

def concat(dfs, axis=0, join='outer', ignore_index=False,
          interleave_partitions=None):
    """
    Concatenate DataFrames along axis.
    
    Parameters:
    - dfs: List of DataFrames to concatenate
    - axis: Axis to concatenate along (0=rows, 1=columns)
    - join: How to handle non-matching columns
    - ignore_index: Reset index in result
    - interleave_partitions: Interleave partitions for axis=0
    
    Returns:
    dask.dataframe.DataFrame: Concatenated DataFrame
    """

def merge(left, right, how='inner', on=None, left_on=None, right_on=None,
         left_index=False, right_index=False, suffixes=('_x', '_y'),
         npartitions=None, shuffle=None):
    """
    Merge DataFrames with database-style joins.
    
    Parameters:
    - left, right: DataFrames to merge
    - how: Type of join ('inner', 'outer', 'left', 'right')
    - on: Column names to join on
    - left_on, right_on: Columns to join on for each DataFrame
    - left_index, right_index: Use index as join key
    - suffixes: Suffixes for overlapping column names
    - npartitions: Number of output partitions
    - shuffle: Shuffling method
    
    Returns:
    dask.dataframe.DataFrame: Merged DataFrame
    """

def merge_asof(left, right, on=None, left_on=None, right_on=None,
              left_index=False, right_index=False, by=None, 
              left_by=None, right_by=None, suffixes=('_x', '_y'),
              tolerance=None, allow_exact_matches=True, direction='backward'):
    """Perform asof merge (temporal/ordered merge)."""

def pivot_table(df, index=None, columns=None, values=None, aggfunc='mean'):
    """Create pivot table."""

def melt(df, id_vars=None, value_vars=None, var_name=None, value_name='value'):
    """Unpivot DataFrame from wide to long format."""

def get_dummies(data, prefix=None, prefix_sep='_', dummy_na=False, 
               columns=None, sparse=False, drop_first=False, dtype=None):
    """Convert categorical variables to dummy/indicator variables."""

Groupby Operations

Group-based operations and aggregations.

class DataFrameGroupBy:
    """DataFrame groupby operations."""
    
    def aggregate(self, func, **kwargs):
        """Apply aggregation functions."""
    
    def apply(self, func, **kwargs):
        """Apply function to each group."""
    
    def size(self):
        """Size of each group."""
    
    def count(self):
        """Count non-null values in each group."""
    
    def mean(self, **kwargs):
        """Mean of each group."""
    
    def sum(self, **kwargs):
        """Sum of each group."""
    
    def min(self, **kwargs):
        """Minimum of each group."""
    
    def max(self, **kwargs):
        """Maximum of each group."""
    
    def std(self, **kwargs):
        """Standard deviation of each group."""
    
    def var(self, **kwargs):
        """Variance of each group."""

class SeriesGroupBy:
    """Series groupby operations."""
    
    def aggregate(self, func, **kwargs):
        """Apply aggregation functions."""
    
    def apply(self, func, **kwargs):
        """Apply function to each group."""
    
    # Same methods as DataFrameGroupBy

Data Processing

Data cleaning, transformation, and processing functions.

def map_partitions(func, *args, meta=None, **kwargs):
    """
    Apply function to each partition.
    
    Parameters:
    - func: Function to apply to each partition
    - *args: DataFrames and other arguments
    - meta: Metadata for result inference
    - **kwargs: Additional arguments to func
    
    Returns:
    dask.dataframe result: Result of applying func
    """

def repartition(df, divisions=None, npartitions=None, partition_size=None,
               freq=None, force=False):
    """
    Change DataFrame partitioning.
    
    Parameters:
    - df: DataFrame to repartition
    - divisions: New index divisions
    - npartitions: Target number of partitions
    - partition_size: Target partition size
    - freq: Frequency for time-based partitioning
    - force: Force repartitioning even if expensive
    
    Returns:
    dask.dataframe.DataFrame: Repartitioned DataFrame
    """

def to_datetime(arg, **kwargs):
    """Convert argument to datetime."""

def to_numeric(arg, errors='raise', **kwargs):
    """Convert argument to numeric type."""

def to_timedelta(arg, unit=None, **kwargs):
    """Convert argument to timedelta."""

def isna(df):
    """Detect missing values."""

Index Operations

Index and division management for DataFrames.

class Index:
    """Distributed index for DataFrames and Series."""
    
    def compute(self, scheduler=None, **kwargs):
        """Compute index values."""
    
    def persist(self, scheduler=None, **kwargs):
        """Persist index in memory."""

def set_index(df, other, divisions=None, sorted=None, npartitions=None,
             shuffle=None, compute=None):
    """Set DataFrame index."""

def reset_index(df, drop=False):
    """Reset DataFrame index."""

Aggregation Specifications

Custom aggregation definitions for groupby operations.

class Aggregation:
    """
    Specification for custom aggregations.
    
    Parameters:
    - name: str - Name of aggregation
    - chunk: callable - Function for chunk-level aggregation
    - agg: callable - Function for combining chunks
    - finalize: callable - Function for final result
    """
    
    def __init__(self, name, chunk, agg, finalize=None):
        """Initialize aggregation specification."""

Usage Examples

Basic DataFrame Operations

import dask.dataframe as dd
import pandas as pd

# Read large CSV file
df = dd.read_csv('large_dataset.csv')

# Basic operations
filtered = df[df.value > 100]
grouped = df.groupby('category').value.mean()
sorted_df = df.sort_values('timestamp')

# Compute results
result = grouped.compute()

Multi-file Processing

import dask.dataframe as dd

# Read multiple files with pattern
df = dd.read_csv('data/year=*/month=*/day=*.csv')

# Process with familiar pandas operations
daily_stats = (df.groupby(['date', 'category'])
                .agg({'value': ['mean', 'sum', 'count']})
                .compute())

Custom Aggregations

import dask.dataframe as dd
from dask.dataframe import Aggregation
import pandas as pd

# Define custom aggregation
def chunk_median(x):
    return x.quantile([0.25, 0.5, 0.75])

def combine_quantiles(x):
    return pd.concat(x).groupby(level=0).median()

median_agg = Aggregation(
    name='median',
    chunk=chunk_median,
    agg=combine_quantiles
)

# Use custom aggregation
df = dd.read_csv('data.csv')
result = df.groupby('category').value.agg(median_agg).compute()

Memory-Efficient Processing

import dask.dataframe as dd

# Read and process without loading full dataset
df = dd.read_parquet('large_dataset.parquet')

# Chain operations efficiently
result = (df.query('value > 0')
           .groupby('category')
           .value.sum()
           .sort_values(ascending=False)
           .head(10)
           .compute())

# Persist intermediate results for reuse
df_clean = df.dropna().persist()
stat1 = df_clean.value.mean().compute()
stat2 = df_clean.value.std().compute()

Time Series Operations

import dask.dataframe as dd

# Read time series data
df = dd.read_csv('timeseries.csv', parse_dates=['timestamp'])
df = df.set_index('timestamp').repartition(freq='1D')

# Time-based operations
daily_avg = df.resample('D').mean()
rolling_mean = df.value.rolling('7D').mean()

# Compute results
results = dd.compute(daily_avg, rolling_mean)

Install with Tessl CLI

npx tessl i tessl/pypi-dask

docs

arrays.md

bags.md

configuration.md

core-functions.md

dataframes.md

delayed.md

diagnostics.md

index.md

tile.json