CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-datacompy

Comprehensive DataFrame comparison library providing functionality equivalent to SAS's PROC COMPARE for Python with support for Pandas, Spark, Polars, Snowflake, and distributed computing

Pending
Overview
Eval results
Files

distributed-comparison.mddocs/

Distributed DataFrame Comparison

Fugue-powered distributed comparison functions that work across multiple backends including Dask, DuckDB, Ray, and Arrow, enabling scalable comparison of large datasets with a unified interface.

Capabilities

Distributed Match Checking

Check if DataFrames match across distributed computing backends with automatic parallelization and optimization.

def is_match(
    df1: AnyDataFrame,
    df2: AnyDataFrame,
    join_columns: str | List[str],
    abs_tol: float = 0,
    rel_tol: float = 0,
    df1_name: str = "df1",
    df2_name: str = "df2",
    ignore_spaces: bool = False,
    ignore_case: bool = False,
    cast_column_names_lower: bool = True,
    parallelism: int | None = None,
    strict_schema: bool = False
) -> bool:
    """
    Check if DataFrames match using distributed computing.
    
    Parameters:
    - df1: First DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)
    - df2: Second DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)
    - join_columns: Column(s) to join dataframes on
    - abs_tol: Absolute tolerance for numeric comparisons
    - rel_tol: Relative tolerance for numeric comparisons
    - df1_name: Display name for first DataFrame
    - df2_name: Display name for second DataFrame
    - ignore_spaces: Strip whitespace from string columns
    - ignore_case: Ignore case in string comparisons
    - cast_column_names_lower: Convert column names to lowercase
    - parallelism: Number of parallel partitions
    - strict_schema: Enforce strict schema matching
    
    Returns:
    True if DataFrames match, False otherwise
    """

Distributed Row Analysis

Analyze row-level relationships between DataFrames using distributed processing.

def all_rows_overlap(
    df1: AnyDataFrame,
    df2: AnyDataFrame,
    join_columns: str | List[str],
    abs_tol: float = 0,
    rel_tol: float = 0,
    df1_name: str = "df1",
    df2_name: str = "df2",
    ignore_spaces: bool = False,
    ignore_case: bool = False,
    cast_column_names_lower: bool = True,
    parallelism: int | None = None,
    strict_schema: bool = False
) -> bool:
    """
    Check if all rows are present in both DataFrames.
    
    Returns:
    True if all rows overlap, False otherwise
    """

def count_matching_rows(
    df1: AnyDataFrame,
    df2: AnyDataFrame,
    join_columns: str | List[str],
    abs_tol: float = 0,
    rel_tol: float = 0,
    df1_name: str = "df1",
    df2_name: str = "df2",
    ignore_spaces: bool = False,
    ignore_case: bool = False,
    cast_column_names_lower: bool = True,
    parallelism: int | None = None,
    strict_schema: bool = False
) -> int:
    """
    Count the number of matching rows between DataFrames.
    
    Returns:
    Number of matching rows
    """

Distributed Column Analysis

Analyze column structure and relationships using distributed processing.

def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
    """
    Get columns that are unique to df1.
    
    Parameters:
    - df1: First DataFrame
    - df2: Second DataFrame
    
    Returns:
    OrderedSet of column names unique to df1
    """

def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
    """
    Get columns that are shared between DataFrames.
    
    Parameters:
    - df1: First DataFrame
    - df2: Second DataFrame
    
    Returns:
    OrderedSet of shared column names
    """

def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool:
    """
    Check if all columns match between DataFrames.
    
    Parameters:
    - df1: First DataFrame
    - df2: Second DataFrame
    
    Returns:
    True if all columns match, False otherwise
    """

Distributed Report Generation

Generate comprehensive comparison reports using distributed computing with customizable output.

def report(
    df1: AnyDataFrame,
    df2: AnyDataFrame,
    join_columns: str | List[str],
    abs_tol: float = 0,
    rel_tol: float = 0,
    df1_name: str = "df1",
    df2_name: str = "df2",
    ignore_spaces: bool = False,
    ignore_case: bool = False,
    cast_column_names_lower: bool = True,
    sample_count: int = 10,
    column_count: int = 10,
    html_file: str | None = None,
    parallelism: int | None = None
) -> str:
    """
    Generate comprehensive comparison report using distributed computing.
    
    Parameters:
    - df1: First DataFrame
    - df2: Second DataFrame
    - join_columns: Column(s) to join dataframes on
    - abs_tol: Absolute tolerance for numeric comparisons
    - rel_tol: Relative tolerance for numeric comparisons
    - df1_name: Display name for first DataFrame
    - df2_name: Display name for second DataFrame
    - ignore_spaces: Strip whitespace from string columns
    - ignore_case: Ignore case in string comparisons
    - cast_column_names_lower: Convert column names to lowercase
    - sample_count: Number of sample mismatches to include
    - column_count: Number of columns to include in detailed stats
    - html_file: Path to save HTML report (optional)
    - parallelism: Number of parallel partitions
    
    Returns:
    Formatted comparison report string
    """

Types

# Fugue AnyDataFrame type supports multiple backends
AnyDataFrame = Union[
    pd.DataFrame,          # Pandas
    pyspark.sql.DataFrame, # Spark
    dask.dataframe.DataFrame, # Dask  
    pyarrow.Table,        # Arrow
    # And other Fugue-supported backends
]

class _StrictSchemaError(Exception):
    """Raised when strict schema validation fails."""

Constants

HASH_COL: str = "__datacompy__hash__"  # Internal hash column name

Usage Examples

Basic Distributed Comparison

import pandas as pd
import dask.dataframe as dd
import datacompy

# Create DataFrames (Pandas and Dask)
df1_pandas = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'value': [10, 20, 30, 40, 50]
})

df2_dask = dd.from_pandas(pd.DataFrame({
    'id': [1, 2, 3, 4, 6],
    'value': [10, 20, 30, 40, 60]
}), npartitions=2)

# Distributed comparison across different backends
matches = datacompy.is_match(
    df1_pandas, df2_dask,
    join_columns=['id'],
    parallelism=4
)

print(f"DataFrames match: {matches}")

Cross-Backend Comparison with DuckDB

import pandas as pd
import duckdb
import datacompy

# Create DataFrames
df1 = pd.DataFrame({
    'id': [1, 2, 3, 4],
    'amount': [100.0, 200.0, 300.0, 400.0]
})

# Create DuckDB connection and table
conn = duckdb.connect()
conn.execute("CREATE TABLE df2 AS SELECT * FROM df1")
conn.execute("UPDATE df2 SET amount = amount * 1.1 WHERE id > 2")

# Get DuckDB table as DataFrame
df2 = conn.execute("SELECT * FROM df2").df()

# Distributed comparison
report_text = datacompy.report(
    df1, df2,
    join_columns=['id'],
    abs_tol=0.1,
    parallelism=2
)

print(report_text)

Large-Scale Comparison with Ray

import pandas as pd
import ray
import datacompy

# Initialize Ray
ray.init()

# Create large DataFrames
df1 = pd.DataFrame({
    'id': range(1000000),
    'value': range(1000000)
})

df2 = df1.copy()
df2.loc[df2['id'] % 1000 == 0, 'value'] += 1  # Introduce some differences

# Convert to Ray datasets for distributed processing
ray_df1 = ray.data.from_pandas(df1)
ray_df2 = ray.data.from_pandas(df2)

# Distributed comparison
matches = datacompy.is_match(
    ray_df1, ray_df2,
    join_columns=['id'],
    parallelism=8
)

matching_count = datacompy.count_matching_rows(
    ray_df1, ray_df2,
    join_columns=['id'],
    parallelism=8
)

print(f"DataFrames match: {matches}")
print(f"Matching rows: {matching_count}")

ray.shutdown()

Schema Validation

import pandas as pd
import datacompy

df1 = pd.DataFrame({
    'id': [1, 2, 3],
    'value': [1.0, 2.0, 3.0]  # float type
})

df2 = pd.DataFrame({
    'id': [1, 2, 3],
    'value': [1, 2, 3]  # int type
})

# Strict schema comparison (will raise error if schemas don't match)
try:
    result = datacompy.is_match(
        df1, df2,
        join_columns=['id'],
        strict_schema=True
    )
except datacompy._StrictSchemaError as e:
    print(f"Schema mismatch: {e}")

# Flexible schema comparison (will attempt type coercion)
result = datacompy.is_match(
    df1, df2,
    join_columns=['id'],
    strict_schema=False
)
print(f"Flexible comparison result: {result}")

Custom Parallelism Control

import dask.dataframe as dd
import datacompy

# Create large Dask DataFrames
df1 = dd.from_pandas(pd.DataFrame({
    'id': range(100000),
    'value': range(100000)
}), npartitions=10)

df2 = dd.from_pandas(pd.DataFrame({
    'id': range(100000),
    'value': range(100000)
}), npartitions=10)

# Control parallelism explicitly
result = datacompy.is_match(
    df1, df2,
    join_columns=['id'],
    parallelism=16  # Use 16 parallel partitions
)

# Generate report with controlled parallelism
report_text = datacompy.report(
    df1, df2,
    join_columns=['id'],
    parallelism=8,
    sample_count=50
)

Supported Backends

The distributed comparison functions work with any DataFrame backend supported by Fugue:

  • Pandas: Local processing
  • Spark: Distributed Spark cluster processing
  • Dask: Distributed Dask processing
  • Ray: Distributed Ray processing
  • DuckDB: High-performance analytical processing
  • Arrow: In-memory columnar processing
  • Polars: High-performance local processing

The functions automatically detect the backend and optimize the comparison strategy accordingly.

Install with Tessl CLI

npx tessl i tessl/pypi-datacompy

docs

column-utilities.md

distributed-comparison.md

index.md

multi-backend-comparison.md

pandas-comparison.md

reporting.md

tile.json