Comprehensive DataFrame comparison library providing functionality equivalent to SAS's PROC COMPARE for Python with support for Pandas, Spark, Polars, Snowflake, and distributed computing
—
Fugue-powered distributed comparison functions that work across multiple backends including Dask, DuckDB, Ray, and Arrow, enabling scalable comparison of large datasets with a unified interface.
Check if DataFrames match across distributed computing backends with automatic parallelization and optimization.
def is_match(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: str | List[str],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
parallelism: int | None = None,
strict_schema: bool = False
) -> bool:
"""
Check if DataFrames match using distributed computing.
Parameters:
- df1: First DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)
- df2: Second DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)
- join_columns: Column(s) to join dataframes on
- abs_tol: Absolute tolerance for numeric comparisons
- rel_tol: Relative tolerance for numeric comparisons
- df1_name: Display name for first DataFrame
- df2_name: Display name for second DataFrame
- ignore_spaces: Strip whitespace from string columns
- ignore_case: Ignore case in string comparisons
- cast_column_names_lower: Convert column names to lowercase
- parallelism: Number of parallel partitions
- strict_schema: Enforce strict schema matching
Returns:
True if DataFrames match, False otherwise
"""Analyze row-level relationships between DataFrames using distributed processing.
def all_rows_overlap(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: str | List[str],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
parallelism: int | None = None,
strict_schema: bool = False
) -> bool:
"""
Check if all rows are present in both DataFrames.
Returns:
True if all rows overlap, False otherwise
"""
def count_matching_rows(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: str | List[str],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
parallelism: int | None = None,
strict_schema: bool = False
) -> int:
"""
Count the number of matching rows between DataFrames.
Returns:
Number of matching rows
"""Analyze column structure and relationships using distributed processing.
def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
"""
Get columns that are unique to df1.
Parameters:
- df1: First DataFrame
- df2: Second DataFrame
Returns:
OrderedSet of column names unique to df1
"""
def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
"""
Get columns that are shared between DataFrames.
Parameters:
- df1: First DataFrame
- df2: Second DataFrame
Returns:
OrderedSet of shared column names
"""
def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool:
"""
Check if all columns match between DataFrames.
Parameters:
- df1: First DataFrame
- df2: Second DataFrame
Returns:
True if all columns match, False otherwise
"""Generate comprehensive comparison reports using distributed computing with customizable output.
def report(
df1: AnyDataFrame,
df2: AnyDataFrame,
join_columns: str | List[str],
abs_tol: float = 0,
rel_tol: float = 0,
df1_name: str = "df1",
df2_name: str = "df2",
ignore_spaces: bool = False,
ignore_case: bool = False,
cast_column_names_lower: bool = True,
sample_count: int = 10,
column_count: int = 10,
html_file: str | None = None,
parallelism: int | None = None
) -> str:
"""
Generate comprehensive comparison report using distributed computing.
Parameters:
- df1: First DataFrame
- df2: Second DataFrame
- join_columns: Column(s) to join dataframes on
- abs_tol: Absolute tolerance for numeric comparisons
- rel_tol: Relative tolerance for numeric comparisons
- df1_name: Display name for first DataFrame
- df2_name: Display name for second DataFrame
- ignore_spaces: Strip whitespace from string columns
- ignore_case: Ignore case in string comparisons
- cast_column_names_lower: Convert column names to lowercase
- sample_count: Number of sample mismatches to include
- column_count: Number of columns to include in detailed stats
- html_file: Path to save HTML report (optional)
- parallelism: Number of parallel partitions
Returns:
Formatted comparison report string
"""# Fugue AnyDataFrame type supports multiple backends
AnyDataFrame = Union[
pd.DataFrame, # Pandas
pyspark.sql.DataFrame, # Spark
dask.dataframe.DataFrame, # Dask
pyarrow.Table, # Arrow
# And other Fugue-supported backends
]
class _StrictSchemaError(Exception):
"""Raised when strict schema validation fails."""HASH_COL: str = "__datacompy__hash__" # Internal hash column nameimport pandas as pd
import dask.dataframe as dd
import datacompy
# Create DataFrames (Pandas and Dask)
df1_pandas = pd.DataFrame({
'id': [1, 2, 3, 4, 5],
'value': [10, 20, 30, 40, 50]
})
df2_dask = dd.from_pandas(pd.DataFrame({
'id': [1, 2, 3, 4, 6],
'value': [10, 20, 30, 40, 60]
}), npartitions=2)
# Distributed comparison across different backends
matches = datacompy.is_match(
df1_pandas, df2_dask,
join_columns=['id'],
parallelism=4
)
print(f"DataFrames match: {matches}")import pandas as pd
import duckdb
import datacompy
# Create DataFrames
df1 = pd.DataFrame({
'id': [1, 2, 3, 4],
'amount': [100.0, 200.0, 300.0, 400.0]
})
# Create DuckDB connection and table
conn = duckdb.connect()
conn.execute("CREATE TABLE df2 AS SELECT * FROM df1")
conn.execute("UPDATE df2 SET amount = amount * 1.1 WHERE id > 2")
# Get DuckDB table as DataFrame
df2 = conn.execute("SELECT * FROM df2").df()
# Distributed comparison
report_text = datacompy.report(
df1, df2,
join_columns=['id'],
abs_tol=0.1,
parallelism=2
)
print(report_text)import pandas as pd
import ray
import datacompy
# Initialize Ray
ray.init()
# Create large DataFrames
df1 = pd.DataFrame({
'id': range(1000000),
'value': range(1000000)
})
df2 = df1.copy()
df2.loc[df2['id'] % 1000 == 0, 'value'] += 1 # Introduce some differences
# Convert to Ray datasets for distributed processing
ray_df1 = ray.data.from_pandas(df1)
ray_df2 = ray.data.from_pandas(df2)
# Distributed comparison
matches = datacompy.is_match(
ray_df1, ray_df2,
join_columns=['id'],
parallelism=8
)
matching_count = datacompy.count_matching_rows(
ray_df1, ray_df2,
join_columns=['id'],
parallelism=8
)
print(f"DataFrames match: {matches}")
print(f"Matching rows: {matching_count}")
ray.shutdown()import pandas as pd
import datacompy
df1 = pd.DataFrame({
'id': [1, 2, 3],
'value': [1.0, 2.0, 3.0] # float type
})
df2 = pd.DataFrame({
'id': [1, 2, 3],
'value': [1, 2, 3] # int type
})
# Strict schema comparison (will raise error if schemas don't match)
try:
result = datacompy.is_match(
df1, df2,
join_columns=['id'],
strict_schema=True
)
except datacompy._StrictSchemaError as e:
print(f"Schema mismatch: {e}")
# Flexible schema comparison (will attempt type coercion)
result = datacompy.is_match(
df1, df2,
join_columns=['id'],
strict_schema=False
)
print(f"Flexible comparison result: {result}")import dask.dataframe as dd
import datacompy
# Create large Dask DataFrames
df1 = dd.from_pandas(pd.DataFrame({
'id': range(100000),
'value': range(100000)
}), npartitions=10)
df2 = dd.from_pandas(pd.DataFrame({
'id': range(100000),
'value': range(100000)
}), npartitions=10)
# Control parallelism explicitly
result = datacompy.is_match(
df1, df2,
join_columns=['id'],
parallelism=16 # Use 16 parallel partitions
)
# Generate report with controlled parallelism
report_text = datacompy.report(
df1, df2,
join_columns=['id'],
parallelism=8,
sample_count=50
)The distributed comparison functions work with any DataFrame backend supported by Fugue:
The functions automatically detect the backend and optimize the comparison strategy accordingly.
Install with Tessl CLI
npx tessl i tessl/pypi-datacompy