or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-data-models.mddistributed-computing.mdindex.mdsklearn-interface.mdtraining-evaluation.mdutilities.md

distributed-computing.mddocs/

0

# Distributed Computing

1

2

Native support for distributed training across Dask and Spark ecosystems, enabling scalable machine learning on large datasets and compute clusters. XGBoost provides seamless integration with popular distributed computing frameworks while maintaining high performance and fault tolerance.

3

4

## Capabilities

5

6

### Dask Integration

7

8

XGBoost's Dask integration enables distributed training and prediction using Dask's flexible task scheduling and data structures. Supports both Dask DataFrame and Dask Array inputs with automatic data partitioning and worker coordination.

9

10

```python { .api }

11

import xgboost.dask as dxgb

12

13

def dxgb.train(client, params, dtrain, num_boost_round=10, evals=(),

14

obj=None, maximize=None, early_stopping_rounds=None,

15

evals_result=None, verbose_eval=True, xgb_model=None,

16

callbacks=None):

17

"""

18

Train XGBoost model using Dask distributed computing.

19

20

Parameters:

21

- client: Dask client for distributed computation (dask.distributed.Client)

22

- params: Training parameters (same as xgb.train) (dict)

23

- dtrain: Training DMatrix or DaskDMatrix (DMatrix or DaskDMatrix)

24

- num_boost_round: Number of boosting rounds (int)

25

- evals: Evaluation sets as list of (DMatrix, name) tuples (list)

26

- obj: Custom objective function (callable, optional)

27

- maximize: Whether to maximize metric (bool, optional)

28

- early_stopping_rounds: Early stopping rounds (int, optional)

29

- evals_result: Dict to store evaluation results (dict, optional)

30

- verbose_eval: Verbosity for evaluation (bool or int)

31

- xgb_model: Existing model to continue training (Booster, optional)

32

- callbacks: Training callbacks (list, optional)

33

34

Returns: dict - Contains 'booster' (trained model) and 'history' (training log)

35

"""

36

37

def dxgb.predict(client, model, data, output_margin=False, missing=float('nan'),

38

pred_leaf=False, pred_contribs=False, approx_contribs=False,

39

pred_interactions=False, validate_features=True,

40

iteration_range=(0, 0), strict_shape=False):

41

"""

42

Run distributed prediction using Dask.

43

44

Parameters:

45

- client: Dask client (dask.distributed.Client)

46

- model: Trained XGBoost model (Booster)

47

- data: Input data (DaskDMatrix, dask.DataFrame, or dask.Array)

48

- output_margin: Whether to output margin values (bool)

49

- missing: Value to treat as missing (float)

50

- pred_leaf: Whether to predict leaf indices (bool)

51

- pred_contribs: Whether to predict contributions (bool)

52

- approx_contribs: Whether to use approximate contributions (bool)

53

- pred_interactions: Whether to predict interactions (bool)

54

- validate_features: Whether to validate features (bool)

55

- iteration_range: Range of trees to use (tuple)

56

- strict_shape: Whether to enforce strict shape (bool)

57

58

Returns: dask.Array or dask.DataFrame - Distributed predictions

59

"""

60

61

def dxgb.inplace_predict(client, model, data, iteration_range=(0, 0),

62

predict_type='value', missing=float('nan'),

63

validate_features=True, base_margin=None,

64

strict_shape=False):

65

"""

66

Inplace distributed prediction using Dask.

67

68

Parameters:

69

- client: Dask client (dask.distributed.Client)

70

- model: Trained XGBoost model (Booster)

71

- data: Input data (dask.DataFrame or dask.Array)

72

- iteration_range: Range of trees to use (tuple)

73

- predict_type: Type of prediction ('value', 'margin', etc.) (str)

74

- missing: Value to treat as missing (float)

75

- validate_features: Whether to validate features (bool)

76

- base_margin: Base prediction margins (array-like, optional)

77

- strict_shape: Whether to enforce strict shape (bool)

78

79

Returns: dask.Array - Distributed predictions

80

"""

81

82

class dxgb.DaskDMatrix:

83

def __init__(self, client, data, label=None, *, weight=None,

84

base_margin=None, missing=None, silent=False,

85

feature_names=None, feature_types=None, group=None,

86

qid=None, label_lower_bound=None, label_upper_bound=None,

87

feature_weights=None, enable_categorical=False):

88

"""

89

DMatrix holding references to Dask DataFrame or Array.

90

91

Parameters:

92

- client: Dask distributed client (dask.distributed.Client)

93

- data: Input data (dask.DataFrame, dask.Array, or list of such objects)

94

- label: Target values (dask.DataFrame, dask.Array, or list)

95

- weight: Instance weights (dask.DataFrame, dask.Array, or list)

96

- base_margin: Base prediction margins (dask.DataFrame, dask.Array, or list)

97

- missing: Value to treat as missing (float)

98

- silent: Whether to suppress loading messages (bool)

99

- feature_names: Feature names (list of str)

100

- feature_types: Feature types (list of str)

101

- group: Group sizes for ranking (dask.Array or list)

102

- qid: Query IDs for ranking (dask.Array or list)

103

- label_lower_bound: Lower bounds for ranking (dask.Array or list)

104

- label_upper_bound: Upper bounds for ranking (dask.Array or list)

105

- feature_weights: Feature weights (dask.Array or list)

106

- enable_categorical: Enable categorical features (bool)

107

"""

108

109

class dxgb.DaskQuantileDMatrix:

110

def __init__(self, client, data, label=None, *, weight=None,

111

base_margin=None, missing=None, silent=False,

112

feature_names=None, feature_types=None, group=None,

113

qid=None, label_lower_bound=None, label_upper_bound=None,

114

feature_weights=None, ref=None, enable_categorical=False,

115

max_bin=256):

116

"""

117

Distributed QuantileDMatrix for memory-efficient training.

118

119

Parameters: Same as DaskDMatrix with additional:

120

- ref: Reference QuantileDMatrix for validation (DaskQuantileDMatrix)

121

- max_bin: Maximum number of bins for quantization (int)

122

"""

123

124

class dxgb.DaskXGBRegressor:

125

def __init__(self, *, max_depth=6, learning_rate=0.3, n_estimators=100,

126

verbosity=1, objective=None, booster='gbtree',

127

tree_method='auto', n_jobs=None, gamma=0, min_child_weight=1,

128

max_delta_step=0, subsample=1, colsample_bytree=1,

129

colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,

130

reg_lambda=1, scale_pos_weight=1, base_score=None,

131

random_state=None, missing=float('nan'), num_parallel_tree=1,

132

monotone_constraints=None, interaction_constraints=None,

133

importance_type='gain', **kwargs):

134

"""Dask-distributed XGBoost regressor with scikit-learn API."""

135

136

def fit(self, X, y, *, sample_weight=None, base_margin=None,

137

eval_set=None, verbose=True, xgb_model=None,

138

sample_weight_eval_set=None, base_margin_eval_set=None,

139

feature_weights=None):

140

"""Fit distributed regressor. Requires active Dask client."""

141

142

def predict(self, X, *, output_margin=False, validate_features=True,

143

base_margin=None, iteration_range=None):

144

"""Distributed prediction returning dask.Array."""

145

146

class dxgb.DaskXGBClassifier:

147

def __init__(self, *, max_depth=6, learning_rate=0.3, n_estimators=100,

148

verbosity=1, objective=None, booster='gbtree',

149

tree_method='auto', n_jobs=None, gamma=0, min_child_weight=1,

150

max_delta_step=0, subsample=1, colsample_bytree=1,

151

colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,

152

reg_lambda=1, scale_pos_weight=1, base_score=None,

153

random_state=None, missing=float('nan'), num_parallel_tree=1,

154

monotone_constraints=None, interaction_constraints=None,

155

importance_type='gain', **kwargs):

156

"""Dask-distributed XGBoost classifier with scikit-learn API."""

157

158

def predict_proba(self, X, *, validate_features=True, base_margin=None,

159

iteration_range=None):

160

"""Predict class probabilities using distributed computation."""

161

162

class dxgb.DaskXGBRanker:

163

def __init__(self, *, max_depth=6, learning_rate=0.3, n_estimators=100,

164

verbosity=1, objective='rank:ndcg', booster='gbtree',

165

tree_method='auto', n_jobs=None, gamma=0, min_child_weight=1,

166

max_delta_step=0, subsample=1, colsample_bytree=1,

167

colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,

168

reg_lambda=1, scale_pos_weight=1, base_score=None,

169

random_state=None, missing=float('nan'), num_parallel_tree=1,

170

monotone_constraints=None, interaction_constraints=None,

171

importance_type='gain', **kwargs):

172

"""Dask-distributed XGBoost ranker for learning-to-rank tasks."""

173

174

class dxgb.DaskXGBRFRegressor:

175

"""Dask-distributed XGBoost random forest regressor."""

176

177

class dxgb.DaskXGBRFClassifier:

178

"""Dask-distributed XGBoost random forest classifier."""

179

180

class dxgb.CommunicatorContext:

181

def __init__(self, **args):

182

"""

183

Dask-specific communicator context manager.

184

185

Parameters:

186

- **args: Arguments for communicator setup

187

"""

188

```

189

190

### Spark Integration

191

192

XGBoost integration with Apache Spark (PySpark) for large-scale distributed machine learning in Spark environments. Supports Spark DataFrames and MLlib pipeline integration.

193

194

```python { .api }

195

from xgboost import spark as spark_xgb

196

197

class spark_xgb.SparkXGBRegressor:

198

def __init__(self, *, features_col='features', label_col='label',

199

prediction_col='prediction', max_depth=6, learning_rate=0.3,

200

n_estimators=100, verbosity=1, objective=None,

201

booster='gbtree', tree_method='auto', gamma=0,

202

min_child_weight=1, max_delta_step=0, subsample=1,

203

colsample_bytree=1, colsample_bylevel=1, colsample_bynode=1,

204

reg_alpha=0, reg_lambda=1, scale_pos_weight=1,

205

base_score=None, random_state=None, missing=float('nan'),

206

num_parallel_tree=1, **kwargs):

207

"""

208

PySpark XGBoost regressor integrated with MLlib pipelines.

209

210

Parameters:

211

- features_col: Features column name (str)

212

- label_col: Label column name (str)

213

- prediction_col: Prediction column name (str)

214

- Other parameters: Same as XGBRegressor

215

"""

216

217

def fit(self, dataset):

218

"""

219

Fit the regressor on Spark DataFrame.

220

221

Parameters:

222

- dataset: Training data (pyspark.sql.DataFrame)

223

224

Returns: SparkXGBRegressorModel

225

"""

226

227

class spark_xgb.SparkXGBRegressorModel:

228

def transform(self, dataset):

229

"""

230

Transform Spark DataFrame with predictions.

231

232

Parameters:

233

- dataset: Input data (pyspark.sql.DataFrame)

234

235

Returns: pyspark.sql.DataFrame - DataFrame with predictions

236

"""

237

238

class spark_xgb.SparkXGBClassifier:

239

def __init__(self, *, features_col='features', label_col='label',

240

prediction_col='prediction', probability_col='probability',

241

raw_prediction_col='rawPrediction', max_depth=6,

242

learning_rate=0.3, n_estimators=100, verbosity=1,

243

objective=None, booster='gbtree', tree_method='auto',

244

gamma=0, min_child_weight=1, max_delta_step=0, subsample=1,

245

colsample_bytree=1, colsample_bylevel=1, colsample_bynode=1,

246

reg_alpha=0, reg_lambda=1, scale_pos_weight=1,

247

base_score=None, random_state=None, missing=float('nan'),

248

num_parallel_tree=1, **kwargs):

249

"""

250

PySpark XGBoost classifier integrated with MLlib pipelines.

251

252

Parameters:

253

- probability_col: Probability column name (str)

254

- raw_prediction_col: Raw prediction column name (str)

255

- Other parameters: Same as SparkXGBRegressor

256

"""

257

258

class spark_xgb.SparkXGBClassifierModel:

259

def transform(self, dataset):

260

"""Transform with class predictions and probabilities."""

261

262

class spark_xgb.SparkXGBRanker:

263

def __init__(self, *, features_col='features', label_col='label',

264

prediction_col='prediction', group_col=None,

265

max_depth=6, learning_rate=0.3, n_estimators=100,

266

verbosity=1, objective='rank:ndcg', booster='gbtree',

267

tree_method='auto', gamma=0, min_child_weight=1,

268

max_delta_step=0, subsample=1, colsample_bytree=1,

269

colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,

270

reg_lambda=1, scale_pos_weight=1, base_score=None,

271

random_state=None, missing=float('nan'),

272

num_parallel_tree=1, **kwargs):

273

"""

274

PySpark XGBoost ranker for learning-to-rank tasks.

275

276

Parameters:

277

- group_col: Group column name for ranking (str)

278

- Other parameters: Same as SparkXGBRegressor

279

"""

280

281

class spark_xgb.SparkXGBRankerModel:

282

"""Trained Spark XGBoost ranker model."""

283

```

284

285

## Usage Examples

286

287

### Dask Distributed Training

288

289

```python

290

import dask

291

import dask.dataframe as dd

292

import dask.array as da

293

from dask.distributed import Client

294

import xgboost.dask as dxgb

295

import numpy as np

296

from sklearn.datasets import make_classification

297

298

# Start Dask client

299

client = Client('localhost:8786') # Connect to Dask scheduler

300

# Or start local cluster: client = Client(processes=True, n_workers=4, threads_per_worker=2)

301

302

# Create large dataset

303

X, y = make_classification(n_samples=100000, n_features=100, n_classes=2,

304

n_informative=50, random_state=42)

305

306

# Convert to Dask arrays for distributed processing

307

X_da = da.from_array(X, chunks=(10000, 100))

308

y_da = da.from_array(y, chunks=(10000,))

309

310

# Create DaskDMatrix

311

dtrain = dxgb.DaskDMatrix(client, X_da, y_da)

312

313

# Training parameters

314

params = {

315

'objective': 'binary:logistic',

316

'eval_metric': 'logloss',

317

'max_depth': 6,

318

'learning_rate': 0.1,

319

'subsample': 0.8,

320

'colsample_bytree': 0.8,

321

'tree_method': 'hist', # Recommended for distributed training

322

'random_state': 42

323

}

324

325

# Distributed training

326

output = dxgb.train(

327

client=client,

328

params=params,

329

dtrain=dtrain,

330

num_boost_round=100,

331

evals=[(dtrain, 'train')],

332

early_stopping_rounds=10,

333

verbose_eval=10

334

)

335

336

# Extract trained model

337

model = output['booster']

338

history = output['history']

339

340

print(f"Training completed with {len(history)} iterations")

341

342

# Distributed prediction

343

predictions = dxgb.predict(client, model, X_da)

344

print(f"Predictions shape: {predictions.shape}")

345

346

# Compute predictions (trigger computation)

347

pred_values = predictions.compute()

348

print(f"Computed predictions shape: {pred_values.shape}")

349

350

# Close client

351

client.close()

352

```

353

354

### Dask with DataFrames

355

356

```python

357

import pandas as pd

358

import dask.dataframe as dd

359

from dask.distributed import Client

360

import xgboost.dask as dxgb

361

362

# Start Dask client

363

client = Client(processes=True, n_workers=2, threads_per_worker=2)

364

365

# Create or load large dataset as Dask DataFrame

366

# In practice, you'd load from files: dd.read_csv('large_dataset.csv')

367

df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])

368

df['target'] = y

369

370

# Convert to Dask DataFrame

371

ddf = dd.from_pandas(df, npartitions=8)

372

373

# Separate features and target

374

feature_cols = [f'feature_{i}' for i in range(X.shape[1])]

375

X_dask = ddf[feature_cols]

376

y_dask = ddf['target']

377

378

# Create DaskDMatrix from DataFrame

379

dtrain = dxgb.DaskDMatrix(client, X_dask, y_dask)

380

381

# Training parameters optimized for distributed training

382

params = {

383

'objective': 'binary:logistic',

384

'eval_metric': ['logloss', 'auc'],

385

'max_depth': 8,

386

'learning_rate': 0.1,

387

'subsample': 0.8,

388

'colsample_bytree': 0.8,

389

'tree_method': 'hist',

390

'max_bin': 256,

391

'random_state': 42

392

}

393

394

# Distributed training with evaluation

395

output = dxgb.train(

396

client=client,

397

params=params,

398

dtrain=dtrain,

399

num_boost_round=200,

400

evals=[(dtrain, 'train')],

401

early_stopping_rounds=20,

402

verbose_eval=25

403

)

404

405

model = output['booster']

406

407

# Distributed prediction on new data

408

test_ddf = ddf.sample(frac=0.2) # Sample for testing

409

test_X = test_ddf[feature_cols]

410

411

predictions = dxgb.predict(client, model, test_X)

412

pred_df = test_ddf.assign(predictions=predictions)

413

414

# Compute and display results

415

result = pred_df.compute()

416

print(f"Test predictions sample:\n{result[['target', 'predictions']].head()}")

417

418

client.close()

419

```

420

421

### Dask Scikit-learn Interface

422

423

```python

424

from dask.distributed import Client

425

import xgboost.dask as dxgb

426

import dask.array as da

427

428

# Start client

429

client = Client(processes=True, n_workers=4)

430

431

# Create distributed data

432

X_da = da.random.random((50000, 50), chunks=(5000, 50))

433

y_da = da.random.randint(0, 2, 50000, chunks=(5000,))

434

435

# Split data

436

train_size = int(0.8 * len(X_da))

437

X_train, X_test = X_da[:train_size], X_da[train_size:]

438

y_train, y_test = y_da[:train_size], y_da[train_size:]

439

440

# Distributed classifier

441

dask_clf = dxgb.DaskXGBClassifier(

442

objective='binary:logistic',

443

max_depth=6,

444

learning_rate=0.1,

445

n_estimators=100,

446

tree_method='hist',

447

random_state=42

448

)

449

450

# Fit with distributed data

451

dask_clf.fit(X_train, y_train,

452

eval_set=[(X_test, y_test)],

453

early_stopping_rounds=10,

454

verbose=False)

455

456

# Distributed prediction

457

y_pred = dask_clf.predict(X_test)

458

y_pred_proba = dask_clf.predict_proba(X_test)

459

460

print(f"Predictions computed: {y_pred.compute().shape}")

461

print(f"Probabilities computed: {y_pred_proba.compute().shape}")

462

463

# Feature importance (computed on the worker that has the model)

464

importance = dask_clf.feature_importances_

465

print(f"Feature importance shape: {importance.shape}")

466

467

client.close()

468

```

469

470

### Spark Distributed Training

471

472

```python

473

from pyspark.sql import SparkSession

474

from pyspark.ml import Pipeline

475

from pyspark.ml.feature import VectorAssembler

476

from pyspark.ml.evaluation import BinaryClassificationEvaluator

477

from xgboost.spark import SparkXGBClassifier

478

import numpy as np

479

import pandas as pd

480

481

# Initialize Spark session

482

spark = SparkSession.builder \

483

.appName("XGBoost Distributed Training") \

484

.config("spark.sql.adaptive.enabled", "true") \

485

.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \

486

.getOrCreate()

487

488

# Create sample data

489

X, y = make_classification(n_samples=100000, n_features=20, n_classes=2,

490

n_informative=15, random_state=42)

491

492

# Create pandas DataFrame

493

df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(20)])

494

df['label'] = y

495

496

# Convert to Spark DataFrame

497

spark_df = spark.createDataFrame(df)

498

499

# Prepare features using VectorAssembler

500

feature_cols = [f'feature_{i}' for i in range(20)]

501

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

502

503

# Create XGBoost classifier for Spark

504

xgb_classifier = SparkXGBClassifier(

505

features_col='features',

506

label_col='label',

507

prediction_col='prediction',

508

probability_col='probability',

509

max_depth=6,

510

learning_rate=0.1,

511

n_estimators=100,

512

subsample=0.8,

513

colsample_bytree=0.8,

514

tree_method='hist',

515

objective='binary:logistic',

516

eval_metric='logloss',

517

random_state=42

518

)

519

520

# Create ML pipeline

521

pipeline = Pipeline(stages=[assembler, xgb_classifier])

522

523

# Split data

524

train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=42)

525

526

# Train model

527

model = pipeline.fit(train_df)

528

529

# Make predictions

530

predictions = model.transform(test_df)

531

532

# Show results

533

predictions.select('label', 'prediction', 'probability').show(10, truncate=False)

534

535

# Evaluate model

536

evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')

537

auc = evaluator.evaluate(predictions)

538

print(f"AUC: {auc:.4f}")

539

540

# Stop Spark

541

spark.stop()

542

```

543

544

### Distributed Cross-Validation with Dask

545

546

```python

547

from dask.distributed import Client

548

import xgboost.dask as dxgb

549

import dask.array as da

550

from sklearn.model_selection import KFold

551

import numpy as np

552

553

# Setup

554

client = Client(processes=True, n_workers=4)

555

556

# Create data

557

X_da = da.random.random((20000, 30), chunks=(2000, 30))

558

y_da = da.random.randint(0, 2, 20000, chunks=(2000,))

559

560

# Parameters for cross-validation

561

params = {

562

'objective': 'binary:logistic',

563

'eval_metric': 'auc',

564

'max_depth': 6,

565

'learning_rate': 0.1,

566

'tree_method': 'hist',

567

'random_state': 42

568

}

569

570

# Manual cross-validation with Dask

571

def distributed_cv(client, X, y, params, n_splits=5, n_rounds=100):

572

"""Perform distributed cross-validation."""

573

n_samples = len(X)

574

fold_size = n_samples // n_splits

575

scores = []

576

577

for fold in range(n_splits):

578

print(f"Training fold {fold + 1}/{n_splits}")

579

580

# Create train/validation splits

581

val_start = fold * fold_size

582

val_end = (fold + 1) * fold_size if fold < n_splits - 1 else n_samples

583

584

# Split indices

585

val_indices = list(range(val_start, val_end))

586

train_indices = list(range(0, val_start)) + list(range(val_end, n_samples))

587

588

# Create training and validation sets

589

X_train_fold = X[train_indices]

590

y_train_fold = y[train_indices]

591

X_val_fold = X[val_indices]

592

y_val_fold = y[val_indices]

593

594

# Create DMatrix objects

595

dtrain_fold = dxgb.DaskDMatrix(client, X_train_fold, y_train_fold)

596

dval_fold = dxgb.DaskDMatrix(client, X_val_fold, y_val_fold)

597

598

# Train model

599

output = dxgb.train(

600

client=client,

601

params=params,

602

dtrain=dtrain_fold,

603

num_boost_round=n_rounds,

604

evals=[(dtrain_fold, 'train'), (dval_fold, 'val')],

605

early_stopping_rounds=10,

606

verbose_eval=False

607

)

608

609

# Get best score

610

model = output['booster']

611

val_score = model.best_score

612

scores.append(val_score)

613

print(f"Fold {fold + 1} AUC: {val_score:.4f}")

614

615

return scores

616

617

# Run distributed CV

618

cv_scores = distributed_cv(client, X_da, y_da, params, n_splits=5)

619

620

print(f"\nCross-validation results:")

621

print(f"Mean AUC: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")

622

print(f"Individual scores: {[f'{score:.4f}' for score in cv_scores]}")

623

624

client.close()

625

```

626

627

### Large-Scale Feature Engineering with Dask

628

629

```python

630

from dask.distributed import Client

631

import dask.dataframe as dd

632

import dask.array as da

633

import xgboost.dask as dxgb

634

import pandas as pd

635

636

# Setup

637

client = Client(processes=True, n_workers=4, memory_limit='2GB')

638

639

# Simulate large dataset loading

640

# In practice: dd.read_csv('huge_dataset.csv', blocksize='100MB')

641

large_df = pd.DataFrame({

642

'feature_1': np.random.randn(1000000),

643

'feature_2': np.random.randn(1000000),

644

'feature_3': np.random.randn(1000000),

645

'categorical_1': np.random.choice(['A', 'B', 'C', 'D'], 1000000),

646

'categorical_2': np.random.choice(['X', 'Y', 'Z'], 1000000),

647

'target': np.random.randint(0, 2, 1000000)

648

})

649

650

# Convert to Dask DataFrame with appropriate partitioning

651

ddf = dd.from_pandas(large_df, npartitions=100)

652

653

# Distributed feature engineering

654

def engineer_features(df):

655

"""Apply feature engineering transformations."""

656

# Numerical transformations

657

df['feature_1_squared'] = df['feature_1'] ** 2

658

df['feature_2_log'] = dd.log(dd.abs(df['feature_2']) + 1)

659

df['feature_interaction'] = df['feature_1'] * df['feature_2']

660

661

# Categorical encoding (simple label encoding for demo)

662

df['cat_1_encoded'] = df['categorical_1'].map({'A': 0, 'B': 1, 'C': 2, 'D': 3})

663

df['cat_2_encoded'] = df['categorical_2'].map({'X': 0, 'Y': 1, 'Z': 2})

664

665

return df

666

667

# Apply feature engineering

668

engineered_df = engineer_features(ddf)

669

670

# Select features for training

671

feature_cols = ['feature_1', 'feature_2', 'feature_3', 'feature_1_squared',

672

'feature_2_log', 'feature_interaction', 'cat_1_encoded', 'cat_2_encoded']

673

674

X_features = engineered_df[feature_cols]

675

y_target = engineered_df['target']

676

677

# Create train/test split

678

n_samples = len(engineered_df)

679

train_size = int(0.8 * n_samples)

680

681

X_train = X_features.iloc[:train_size]

682

X_test = X_features.iloc[train_size:]

683

y_train = y_target.iloc[:train_size]

684

y_test = y_target.iloc[train_size:]

685

686

# Create DaskDMatrix

687

dtrain = dxgb.DaskDMatrix(client, X_train, y_train)

688

dtest = dxgb.DaskDMatrix(client, X_test, y_test)

689

690

# Training parameters

691

params = {

692

'objective': 'binary:logistic',

693

'eval_metric': ['logloss', 'auc'],

694

'max_depth': 8,

695

'learning_rate': 0.1,

696

'subsample': 0.8,

697

'colsample_bytree': 0.8,

698

'tree_method': 'hist',

699

'max_bin': 256,

700

'random_state': 42

701

}

702

703

# Distributed training

704

print("Starting distributed training on engineered features...")

705

output = dxgb.train(

706

client=client,

707

params=params,

708

dtrain=dtrain,

709

num_boost_round=100,

710

evals=[(dtrain, 'train'), (dtest, 'test')],

711

early_stopping_rounds=10,

712

verbose_eval=10

713

)

714

715

model = output['booster']

716

print(f"Training completed at iteration {model.best_iteration}")

717

print(f"Best test score: {model.best_score:.4f}")

718

719

# Feature importance analysis

720

feature_importance = model.get_score(importance_type='gain')

721

print(f"\nTop 5 most important features:")

722

sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)

723

for feature, importance in sorted_features[:5]:

724

print(f"{feature}: {importance:.4f}")

725

726

client.close()

727

```