0
# Distributed DataFrame Comparison
1
2
Fugue-powered distributed comparison functions that work across multiple backends including Dask, DuckDB, Ray, and Arrow, enabling scalable comparison of large datasets with a unified interface.
3
4
## Capabilities
5
6
### Distributed Match Checking
7
8
Check if DataFrames match across distributed computing backends with automatic parallelization and optimization.
9
10
```python { .api }
11
def is_match(
12
df1: AnyDataFrame,
13
df2: AnyDataFrame,
14
join_columns: str | List[str],
15
abs_tol: float = 0,
16
rel_tol: float = 0,
17
df1_name: str = "df1",
18
df2_name: str = "df2",
19
ignore_spaces: bool = False,
20
ignore_case: bool = False,
21
cast_column_names_lower: bool = True,
22
parallelism: int | None = None,
23
strict_schema: bool = False
24
) -> bool:
25
"""
26
Check if DataFrames match using distributed computing.
27
28
Parameters:
29
- df1: First DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)
30
- df2: Second DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)
31
- join_columns: Column(s) to join dataframes on
32
- abs_tol: Absolute tolerance for numeric comparisons
33
- rel_tol: Relative tolerance for numeric comparisons
34
- df1_name: Display name for first DataFrame
35
- df2_name: Display name for second DataFrame
36
- ignore_spaces: Strip whitespace from string columns
37
- ignore_case: Ignore case in string comparisons
38
- cast_column_names_lower: Convert column names to lowercase
39
- parallelism: Number of parallel partitions
40
- strict_schema: Enforce strict schema matching
41
42
Returns:
43
True if DataFrames match, False otherwise
44
"""
45
```
46
47
### Distributed Row Analysis
48
49
Analyze row-level relationships between DataFrames using distributed processing.
50
51
```python { .api }
52
def all_rows_overlap(
53
df1: AnyDataFrame,
54
df2: AnyDataFrame,
55
join_columns: str | List[str],
56
abs_tol: float = 0,
57
rel_tol: float = 0,
58
df1_name: str = "df1",
59
df2_name: str = "df2",
60
ignore_spaces: bool = False,
61
ignore_case: bool = False,
62
cast_column_names_lower: bool = True,
63
parallelism: int | None = None,
64
strict_schema: bool = False
65
) -> bool:
66
"""
67
Check if all rows are present in both DataFrames.
68
69
Returns:
70
True if all rows overlap, False otherwise
71
"""
72
73
def count_matching_rows(
74
df1: AnyDataFrame,
75
df2: AnyDataFrame,
76
join_columns: str | List[str],
77
abs_tol: float = 0,
78
rel_tol: float = 0,
79
df1_name: str = "df1",
80
df2_name: str = "df2",
81
ignore_spaces: bool = False,
82
ignore_case: bool = False,
83
cast_column_names_lower: bool = True,
84
parallelism: int | None = None,
85
strict_schema: bool = False
86
) -> int:
87
"""
88
Count the number of matching rows between DataFrames.
89
90
Returns:
91
Number of matching rows
92
"""
93
```
94
95
### Distributed Column Analysis
96
97
Analyze column structure and relationships using distributed processing.
98
99
```python { .api }
100
def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
101
"""
102
Get columns that are unique to df1.
103
104
Parameters:
105
- df1: First DataFrame
106
- df2: Second DataFrame
107
108
Returns:
109
OrderedSet of column names unique to df1
110
"""
111
112
def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:
113
"""
114
Get columns that are shared between DataFrames.
115
116
Parameters:
117
- df1: First DataFrame
118
- df2: Second DataFrame
119
120
Returns:
121
OrderedSet of shared column names
122
"""
123
124
def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool:
125
"""
126
Check if all columns match between DataFrames.
127
128
Parameters:
129
- df1: First DataFrame
130
- df2: Second DataFrame
131
132
Returns:
133
True if all columns match, False otherwise
134
"""
135
```
136
137
### Distributed Report Generation
138
139
Generate comprehensive comparison reports using distributed computing with customizable output.
140
141
```python { .api }
142
def report(
143
df1: AnyDataFrame,
144
df2: AnyDataFrame,
145
join_columns: str | List[str],
146
abs_tol: float = 0,
147
rel_tol: float = 0,
148
df1_name: str = "df1",
149
df2_name: str = "df2",
150
ignore_spaces: bool = False,
151
ignore_case: bool = False,
152
cast_column_names_lower: bool = True,
153
sample_count: int = 10,
154
column_count: int = 10,
155
html_file: str | None = None,
156
parallelism: int | None = None
157
) -> str:
158
"""
159
Generate comprehensive comparison report using distributed computing.
160
161
Parameters:
162
- df1: First DataFrame
163
- df2: Second DataFrame
164
- join_columns: Column(s) to join dataframes on
165
- abs_tol: Absolute tolerance for numeric comparisons
166
- rel_tol: Relative tolerance for numeric comparisons
167
- df1_name: Display name for first DataFrame
168
- df2_name: Display name for second DataFrame
169
- ignore_spaces: Strip whitespace from string columns
170
- ignore_case: Ignore case in string comparisons
171
- cast_column_names_lower: Convert column names to lowercase
172
- sample_count: Number of sample mismatches to include
173
- column_count: Number of columns to include in detailed stats
174
- html_file: Path to save HTML report (optional)
175
- parallelism: Number of parallel partitions
176
177
Returns:
178
Formatted comparison report string
179
"""
180
```
181
182
## Types
183
184
```python { .api }
185
# Fugue AnyDataFrame type supports multiple backends
186
AnyDataFrame = Union[
187
pd.DataFrame, # Pandas
188
pyspark.sql.DataFrame, # Spark
189
dask.dataframe.DataFrame, # Dask
190
pyarrow.Table, # Arrow
191
# And other Fugue-supported backends
192
]
193
194
class _StrictSchemaError(Exception):
195
"""Raised when strict schema validation fails."""
196
```
197
198
## Constants
199
200
```python { .api }
201
HASH_COL: str = "__datacompy__hash__" # Internal hash column name
202
```
203
204
## Usage Examples
205
206
### Basic Distributed Comparison
207
208
```python
209
import pandas as pd
210
import dask.dataframe as dd
211
import datacompy
212
213
# Create DataFrames (Pandas and Dask)
214
df1_pandas = pd.DataFrame({
215
'id': [1, 2, 3, 4, 5],
216
'value': [10, 20, 30, 40, 50]
217
})
218
219
df2_dask = dd.from_pandas(pd.DataFrame({
220
'id': [1, 2, 3, 4, 6],
221
'value': [10, 20, 30, 40, 60]
222
}), npartitions=2)
223
224
# Distributed comparison across different backends
225
matches = datacompy.is_match(
226
df1_pandas, df2_dask,
227
join_columns=['id'],
228
parallelism=4
229
)
230
231
print(f"DataFrames match: {matches}")
232
```
233
234
### Cross-Backend Comparison with DuckDB
235
236
```python
237
import pandas as pd
238
import duckdb
239
import datacompy
240
241
# Create DataFrames
242
df1 = pd.DataFrame({
243
'id': [1, 2, 3, 4],
244
'amount': [100.0, 200.0, 300.0, 400.0]
245
})
246
247
# Create DuckDB connection and table
248
conn = duckdb.connect()
249
conn.execute("CREATE TABLE df2 AS SELECT * FROM df1")
250
conn.execute("UPDATE df2 SET amount = amount * 1.1 WHERE id > 2")
251
252
# Get DuckDB table as DataFrame
253
df2 = conn.execute("SELECT * FROM df2").df()
254
255
# Distributed comparison
256
report_text = datacompy.report(
257
df1, df2,
258
join_columns=['id'],
259
abs_tol=0.1,
260
parallelism=2
261
)
262
263
print(report_text)
264
```
265
266
### Large-Scale Comparison with Ray
267
268
```python
269
import pandas as pd
270
import ray
271
import datacompy
272
273
# Initialize Ray
274
ray.init()
275
276
# Create large DataFrames
277
df1 = pd.DataFrame({
278
'id': range(1000000),
279
'value': range(1000000)
280
})
281
282
df2 = df1.copy()
283
df2.loc[df2['id'] % 1000 == 0, 'value'] += 1 # Introduce some differences
284
285
# Convert to Ray datasets for distributed processing
286
ray_df1 = ray.data.from_pandas(df1)
287
ray_df2 = ray.data.from_pandas(df2)
288
289
# Distributed comparison
290
matches = datacompy.is_match(
291
ray_df1, ray_df2,
292
join_columns=['id'],
293
parallelism=8
294
)
295
296
matching_count = datacompy.count_matching_rows(
297
ray_df1, ray_df2,
298
join_columns=['id'],
299
parallelism=8
300
)
301
302
print(f"DataFrames match: {matches}")
303
print(f"Matching rows: {matching_count}")
304
305
ray.shutdown()
306
```
307
308
### Schema Validation
309
310
```python
311
import pandas as pd
312
import datacompy
313
314
df1 = pd.DataFrame({
315
'id': [1, 2, 3],
316
'value': [1.0, 2.0, 3.0] # float type
317
})
318
319
df2 = pd.DataFrame({
320
'id': [1, 2, 3],
321
'value': [1, 2, 3] # int type
322
})
323
324
# Strict schema comparison (will raise error if schemas don't match)
325
try:
326
result = datacompy.is_match(
327
df1, df2,
328
join_columns=['id'],
329
strict_schema=True
330
)
331
except datacompy._StrictSchemaError as e:
332
print(f"Schema mismatch: {e}")
333
334
# Flexible schema comparison (will attempt type coercion)
335
result = datacompy.is_match(
336
df1, df2,
337
join_columns=['id'],
338
strict_schema=False
339
)
340
print(f"Flexible comparison result: {result}")
341
```
342
343
### Custom Parallelism Control
344
345
```python
346
import dask.dataframe as dd
347
import datacompy
348
349
# Create large Dask DataFrames
350
df1 = dd.from_pandas(pd.DataFrame({
351
'id': range(100000),
352
'value': range(100000)
353
}), npartitions=10)
354
355
df2 = dd.from_pandas(pd.DataFrame({
356
'id': range(100000),
357
'value': range(100000)
358
}), npartitions=10)
359
360
# Control parallelism explicitly
361
result = datacompy.is_match(
362
df1, df2,
363
join_columns=['id'],
364
parallelism=16 # Use 16 parallel partitions
365
)
366
367
# Generate report with controlled parallelism
368
report_text = datacompy.report(
369
df1, df2,
370
join_columns=['id'],
371
parallelism=8,
372
sample_count=50
373
)
374
```
375
376
## Supported Backends
377
378
The distributed comparison functions work with any DataFrame backend supported by Fugue:
379
380
- **Pandas**: Local processing
381
- **Spark**: Distributed Spark cluster processing
382
- **Dask**: Distributed Dask processing
383
- **Ray**: Distributed Ray processing
384
- **DuckDB**: High-performance analytical processing
385
- **Arrow**: In-memory columnar processing
386
- **Polars**: High-performance local processing
387
388
The functions automatically detect the backend and optimize the comparison strategy accordingly.