or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

column-utilities.mddistributed-comparison.mdindex.mdmulti-backend-comparison.mdpandas-comparison.mdreporting.md

distributed-comparison.mddocs/

0

# Distributed DataFrame Comparison

1

2

Fugue-powered distributed comparison functions that work across multiple backends including Dask, DuckDB, Ray, and Arrow, enabling scalable comparison of large datasets with a unified interface.

3

4

## Capabilities

5

6

### Distributed Match Checking

7

8

Check if DataFrames match across distributed computing backends with automatic parallelization and optimization.

9

10

```python { .api }

11

def is_match(

12

df1: AnyDataFrame,

13

df2: AnyDataFrame,

14

join_columns: str | List[str],

15

abs_tol: float = 0,

16

rel_tol: float = 0,

17

df1_name: str = "df1",

18

df2_name: str = "df2",

19

ignore_spaces: bool = False,

20

ignore_case: bool = False,

21

cast_column_names_lower: bool = True,

22

parallelism: int | None = None,

23

strict_schema: bool = False

24

) -> bool:

25

"""

26

Check if DataFrames match using distributed computing.

27

28

Parameters:

29

- df1: First DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)

30

- df2: Second DataFrame (supports Pandas, Spark, Dask, Arrow, etc.)

31

- join_columns: Column(s) to join dataframes on

32

- abs_tol: Absolute tolerance for numeric comparisons

33

- rel_tol: Relative tolerance for numeric comparisons

34

- df1_name: Display name for first DataFrame

35

- df2_name: Display name for second DataFrame

36

- ignore_spaces: Strip whitespace from string columns

37

- ignore_case: Ignore case in string comparisons

38

- cast_column_names_lower: Convert column names to lowercase

39

- parallelism: Number of parallel partitions

40

- strict_schema: Enforce strict schema matching

41

42

Returns:

43

True if DataFrames match, False otherwise

44

"""

45

```

46

47

### Distributed Row Analysis

48

49

Analyze row-level relationships between DataFrames using distributed processing.

50

51

```python { .api }

52

def all_rows_overlap(

53

df1: AnyDataFrame,

54

df2: AnyDataFrame,

55

join_columns: str | List[str],

56

abs_tol: float = 0,

57

rel_tol: float = 0,

58

df1_name: str = "df1",

59

df2_name: str = "df2",

60

ignore_spaces: bool = False,

61

ignore_case: bool = False,

62

cast_column_names_lower: bool = True,

63

parallelism: int | None = None,

64

strict_schema: bool = False

65

) -> bool:

66

"""

67

Check if all rows are present in both DataFrames.

68

69

Returns:

70

True if all rows overlap, False otherwise

71

"""

72

73

def count_matching_rows(

74

df1: AnyDataFrame,

75

df2: AnyDataFrame,

76

join_columns: str | List[str],

77

abs_tol: float = 0,

78

rel_tol: float = 0,

79

df1_name: str = "df1",

80

df2_name: str = "df2",

81

ignore_spaces: bool = False,

82

ignore_case: bool = False,

83

cast_column_names_lower: bool = True,

84

parallelism: int | None = None,

85

strict_schema: bool = False

86

) -> int:

87

"""

88

Count the number of matching rows between DataFrames.

89

90

Returns:

91

Number of matching rows

92

"""

93

```

94

95

### Distributed Column Analysis

96

97

Analyze column structure and relationships using distributed processing.

98

99

```python { .api }

100

def unq_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:

101

"""

102

Get columns that are unique to df1.

103

104

Parameters:

105

- df1: First DataFrame

106

- df2: Second DataFrame

107

108

Returns:

109

OrderedSet of column names unique to df1

110

"""

111

112

def intersect_columns(df1: AnyDataFrame, df2: AnyDataFrame) -> OrderedSet[str]:

113

"""

114

Get columns that are shared between DataFrames.

115

116

Parameters:

117

- df1: First DataFrame

118

- df2: Second DataFrame

119

120

Returns:

121

OrderedSet of shared column names

122

"""

123

124

def all_columns_match(df1: AnyDataFrame, df2: AnyDataFrame) -> bool:

125

"""

126

Check if all columns match between DataFrames.

127

128

Parameters:

129

- df1: First DataFrame

130

- df2: Second DataFrame

131

132

Returns:

133

True if all columns match, False otherwise

134

"""

135

```

136

137

### Distributed Report Generation

138

139

Generate comprehensive comparison reports using distributed computing with customizable output.

140

141

```python { .api }

142

def report(

143

df1: AnyDataFrame,

144

df2: AnyDataFrame,

145

join_columns: str | List[str],

146

abs_tol: float = 0,

147

rel_tol: float = 0,

148

df1_name: str = "df1",

149

df2_name: str = "df2",

150

ignore_spaces: bool = False,

151

ignore_case: bool = False,

152

cast_column_names_lower: bool = True,

153

sample_count: int = 10,

154

column_count: int = 10,

155

html_file: str | None = None,

156

parallelism: int | None = None

157

) -> str:

158

"""

159

Generate comprehensive comparison report using distributed computing.

160

161

Parameters:

162

- df1: First DataFrame

163

- df2: Second DataFrame

164

- join_columns: Column(s) to join dataframes on

165

- abs_tol: Absolute tolerance for numeric comparisons

166

- rel_tol: Relative tolerance for numeric comparisons

167

- df1_name: Display name for first DataFrame

168

- df2_name: Display name for second DataFrame

169

- ignore_spaces: Strip whitespace from string columns

170

- ignore_case: Ignore case in string comparisons

171

- cast_column_names_lower: Convert column names to lowercase

172

- sample_count: Number of sample mismatches to include

173

- column_count: Number of columns to include in detailed stats

174

- html_file: Path to save HTML report (optional)

175

- parallelism: Number of parallel partitions

176

177

Returns:

178

Formatted comparison report string

179

"""

180

```

181

182

## Types

183

184

```python { .api }

185

# Fugue AnyDataFrame type supports multiple backends

186

AnyDataFrame = Union[

187

pd.DataFrame, # Pandas

188

pyspark.sql.DataFrame, # Spark

189

dask.dataframe.DataFrame, # Dask

190

pyarrow.Table, # Arrow

191

# And other Fugue-supported backends

192

]

193

194

class _StrictSchemaError(Exception):

195

"""Raised when strict schema validation fails."""

196

```

197

198

## Constants

199

200

```python { .api }

201

HASH_COL: str = "__datacompy__hash__" # Internal hash column name

202

```

203

204

## Usage Examples

205

206

### Basic Distributed Comparison

207

208

```python

209

import pandas as pd

210

import dask.dataframe as dd

211

import datacompy

212

213

# Create DataFrames (Pandas and Dask)

214

df1_pandas = pd.DataFrame({

215

'id': [1, 2, 3, 4, 5],

216

'value': [10, 20, 30, 40, 50]

217

})

218

219

df2_dask = dd.from_pandas(pd.DataFrame({

220

'id': [1, 2, 3, 4, 6],

221

'value': [10, 20, 30, 40, 60]

222

}), npartitions=2)

223

224

# Distributed comparison across different backends

225

matches = datacompy.is_match(

226

df1_pandas, df2_dask,

227

join_columns=['id'],

228

parallelism=4

229

)

230

231

print(f"DataFrames match: {matches}")

232

```

233

234

### Cross-Backend Comparison with DuckDB

235

236

```python

237

import pandas as pd

238

import duckdb

239

import datacompy

240

241

# Create DataFrames

242

df1 = pd.DataFrame({

243

'id': [1, 2, 3, 4],

244

'amount': [100.0, 200.0, 300.0, 400.0]

245

})

246

247

# Create DuckDB connection and table

248

conn = duckdb.connect()

249

conn.execute("CREATE TABLE df2 AS SELECT * FROM df1")

250

conn.execute("UPDATE df2 SET amount = amount * 1.1 WHERE id > 2")

251

252

# Get DuckDB table as DataFrame

253

df2 = conn.execute("SELECT * FROM df2").df()

254

255

# Distributed comparison

256

report_text = datacompy.report(

257

df1, df2,

258

join_columns=['id'],

259

abs_tol=0.1,

260

parallelism=2

261

)

262

263

print(report_text)

264

```

265

266

### Large-Scale Comparison with Ray

267

268

```python

269

import pandas as pd

270

import ray

271

import datacompy

272

273

# Initialize Ray

274

ray.init()

275

276

# Create large DataFrames

277

df1 = pd.DataFrame({

278

'id': range(1000000),

279

'value': range(1000000)

280

})

281

282

df2 = df1.copy()

283

df2.loc[df2['id'] % 1000 == 0, 'value'] += 1 # Introduce some differences

284

285

# Convert to Ray datasets for distributed processing

286

ray_df1 = ray.data.from_pandas(df1)

287

ray_df2 = ray.data.from_pandas(df2)

288

289

# Distributed comparison

290

matches = datacompy.is_match(

291

ray_df1, ray_df2,

292

join_columns=['id'],

293

parallelism=8

294

)

295

296

matching_count = datacompy.count_matching_rows(

297

ray_df1, ray_df2,

298

join_columns=['id'],

299

parallelism=8

300

)

301

302

print(f"DataFrames match: {matches}")

303

print(f"Matching rows: {matching_count}")

304

305

ray.shutdown()

306

```

307

308

### Schema Validation

309

310

```python

311

import pandas as pd

312

import datacompy

313

314

df1 = pd.DataFrame({

315

'id': [1, 2, 3],

316

'value': [1.0, 2.0, 3.0] # float type

317

})

318

319

df2 = pd.DataFrame({

320

'id': [1, 2, 3],

321

'value': [1, 2, 3] # int type

322

})

323

324

# Strict schema comparison (will raise error if schemas don't match)

325

try:

326

result = datacompy.is_match(

327

df1, df2,

328

join_columns=['id'],

329

strict_schema=True

330

)

331

except datacompy._StrictSchemaError as e:

332

print(f"Schema mismatch: {e}")

333

334

# Flexible schema comparison (will attempt type coercion)

335

result = datacompy.is_match(

336

df1, df2,

337

join_columns=['id'],

338

strict_schema=False

339

)

340

print(f"Flexible comparison result: {result}")

341

```

342

343

### Custom Parallelism Control

344

345

```python

346

import dask.dataframe as dd

347

import datacompy

348

349

# Create large Dask DataFrames

350

df1 = dd.from_pandas(pd.DataFrame({

351

'id': range(100000),

352

'value': range(100000)

353

}), npartitions=10)

354

355

df2 = dd.from_pandas(pd.DataFrame({

356

'id': range(100000),

357

'value': range(100000)

358

}), npartitions=10)

359

360

# Control parallelism explicitly

361

result = datacompy.is_match(

362

df1, df2,

363

join_columns=['id'],

364

parallelism=16 # Use 16 parallel partitions

365

)

366

367

# Generate report with controlled parallelism

368

report_text = datacompy.report(

369

df1, df2,

370

join_columns=['id'],

371

parallelism=8,

372

sample_count=50

373

)

374

```

375

376

## Supported Backends

377

378

The distributed comparison functions work with any DataFrame backend supported by Fugue:

379

380

- **Pandas**: Local processing

381

- **Spark**: Distributed Spark cluster processing

382

- **Dask**: Distributed Dask processing

383

- **Ray**: Distributed Ray processing

384

- **DuckDB**: High-performance analytical processing

385

- **Arrow**: In-memory columnar processing

386

- **Polars**: High-performance local processing

387

388

The functions automatically detect the backend and optimize the comparison strategy accordingly.