0
# Distributed Computing
1
2
Native support for distributed training across Dask and Spark ecosystems, enabling scalable machine learning on large datasets and compute clusters. XGBoost provides seamless integration with popular distributed computing frameworks while maintaining high performance and fault tolerance.
3
4
## Capabilities
5
6
### Dask Integration
7
8
XGBoost's Dask integration enables distributed training and prediction using Dask's flexible task scheduling and data structures. Supports both Dask DataFrame and Dask Array inputs with automatic data partitioning and worker coordination.
9
10
```python { .api }
11
import xgboost.dask as dxgb
12
13
def dxgb.train(client, params, dtrain, num_boost_round=10, evals=(),
14
obj=None, maximize=None, early_stopping_rounds=None,
15
evals_result=None, verbose_eval=True, xgb_model=None,
16
callbacks=None):
17
"""
18
Train XGBoost model using Dask distributed computing.
19
20
Parameters:
21
- client: Dask client for distributed computation (dask.distributed.Client)
22
- params: Training parameters (same as xgb.train) (dict)
23
- dtrain: Training DMatrix or DaskDMatrix (DMatrix or DaskDMatrix)
24
- num_boost_round: Number of boosting rounds (int)
25
- evals: Evaluation sets as list of (DMatrix, name) tuples (list)
26
- obj: Custom objective function (callable, optional)
27
- maximize: Whether to maximize metric (bool, optional)
28
- early_stopping_rounds: Early stopping rounds (int, optional)
29
- evals_result: Dict to store evaluation results (dict, optional)
30
- verbose_eval: Verbosity for evaluation (bool or int)
31
- xgb_model: Existing model to continue training (Booster, optional)
32
- callbacks: Training callbacks (list, optional)
33
34
Returns: dict - Contains 'booster' (trained model) and 'history' (training log)
35
"""
36
37
def dxgb.predict(client, model, data, output_margin=False, missing=float('nan'),
38
pred_leaf=False, pred_contribs=False, approx_contribs=False,
39
pred_interactions=False, validate_features=True,
40
iteration_range=(0, 0), strict_shape=False):
41
"""
42
Run distributed prediction using Dask.
43
44
Parameters:
45
- client: Dask client (dask.distributed.Client)
46
- model: Trained XGBoost model (Booster)
47
- data: Input data (DaskDMatrix, dask.DataFrame, or dask.Array)
48
- output_margin: Whether to output margin values (bool)
49
- missing: Value to treat as missing (float)
50
- pred_leaf: Whether to predict leaf indices (bool)
51
- pred_contribs: Whether to predict contributions (bool)
52
- approx_contribs: Whether to use approximate contributions (bool)
53
- pred_interactions: Whether to predict interactions (bool)
54
- validate_features: Whether to validate features (bool)
55
- iteration_range: Range of trees to use (tuple)
56
- strict_shape: Whether to enforce strict shape (bool)
57
58
Returns: dask.Array or dask.DataFrame - Distributed predictions
59
"""
60
61
def dxgb.inplace_predict(client, model, data, iteration_range=(0, 0),
62
predict_type='value', missing=float('nan'),
63
validate_features=True, base_margin=None,
64
strict_shape=False):
65
"""
66
Inplace distributed prediction using Dask.
67
68
Parameters:
69
- client: Dask client (dask.distributed.Client)
70
- model: Trained XGBoost model (Booster)
71
- data: Input data (dask.DataFrame or dask.Array)
72
- iteration_range: Range of trees to use (tuple)
73
- predict_type: Type of prediction ('value', 'margin', etc.) (str)
74
- missing: Value to treat as missing (float)
75
- validate_features: Whether to validate features (bool)
76
- base_margin: Base prediction margins (array-like, optional)
77
- strict_shape: Whether to enforce strict shape (bool)
78
79
Returns: dask.Array - Distributed predictions
80
"""
81
82
class dxgb.DaskDMatrix:
83
def __init__(self, client, data, label=None, *, weight=None,
84
base_margin=None, missing=None, silent=False,
85
feature_names=None, feature_types=None, group=None,
86
qid=None, label_lower_bound=None, label_upper_bound=None,
87
feature_weights=None, enable_categorical=False):
88
"""
89
DMatrix holding references to Dask DataFrame or Array.
90
91
Parameters:
92
- client: Dask distributed client (dask.distributed.Client)
93
- data: Input data (dask.DataFrame, dask.Array, or list of such objects)
94
- label: Target values (dask.DataFrame, dask.Array, or list)
95
- weight: Instance weights (dask.DataFrame, dask.Array, or list)
96
- base_margin: Base prediction margins (dask.DataFrame, dask.Array, or list)
97
- missing: Value to treat as missing (float)
98
- silent: Whether to suppress loading messages (bool)
99
- feature_names: Feature names (list of str)
100
- feature_types: Feature types (list of str)
101
- group: Group sizes for ranking (dask.Array or list)
102
- qid: Query IDs for ranking (dask.Array or list)
103
- label_lower_bound: Lower bounds for ranking (dask.Array or list)
104
- label_upper_bound: Upper bounds for ranking (dask.Array or list)
105
- feature_weights: Feature weights (dask.Array or list)
106
- enable_categorical: Enable categorical features (bool)
107
"""
108
109
class dxgb.DaskQuantileDMatrix:
110
def __init__(self, client, data, label=None, *, weight=None,
111
base_margin=None, missing=None, silent=False,
112
feature_names=None, feature_types=None, group=None,
113
qid=None, label_lower_bound=None, label_upper_bound=None,
114
feature_weights=None, ref=None, enable_categorical=False,
115
max_bin=256):
116
"""
117
Distributed QuantileDMatrix for memory-efficient training.
118
119
Parameters: Same as DaskDMatrix with additional:
120
- ref: Reference QuantileDMatrix for validation (DaskQuantileDMatrix)
121
- max_bin: Maximum number of bins for quantization (int)
122
"""
123
124
class dxgb.DaskXGBRegressor:
125
def __init__(self, *, max_depth=6, learning_rate=0.3, n_estimators=100,
126
verbosity=1, objective=None, booster='gbtree',
127
tree_method='auto', n_jobs=None, gamma=0, min_child_weight=1,
128
max_delta_step=0, subsample=1, colsample_bytree=1,
129
colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,
130
reg_lambda=1, scale_pos_weight=1, base_score=None,
131
random_state=None, missing=float('nan'), num_parallel_tree=1,
132
monotone_constraints=None, interaction_constraints=None,
133
importance_type='gain', **kwargs):
134
"""Dask-distributed XGBoost regressor with scikit-learn API."""
135
136
def fit(self, X, y, *, sample_weight=None, base_margin=None,
137
eval_set=None, verbose=True, xgb_model=None,
138
sample_weight_eval_set=None, base_margin_eval_set=None,
139
feature_weights=None):
140
"""Fit distributed regressor. Requires active Dask client."""
141
142
def predict(self, X, *, output_margin=False, validate_features=True,
143
base_margin=None, iteration_range=None):
144
"""Distributed prediction returning dask.Array."""
145
146
class dxgb.DaskXGBClassifier:
147
def __init__(self, *, max_depth=6, learning_rate=0.3, n_estimators=100,
148
verbosity=1, objective=None, booster='gbtree',
149
tree_method='auto', n_jobs=None, gamma=0, min_child_weight=1,
150
max_delta_step=0, subsample=1, colsample_bytree=1,
151
colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,
152
reg_lambda=1, scale_pos_weight=1, base_score=None,
153
random_state=None, missing=float('nan'), num_parallel_tree=1,
154
monotone_constraints=None, interaction_constraints=None,
155
importance_type='gain', **kwargs):
156
"""Dask-distributed XGBoost classifier with scikit-learn API."""
157
158
def predict_proba(self, X, *, validate_features=True, base_margin=None,
159
iteration_range=None):
160
"""Predict class probabilities using distributed computation."""
161
162
class dxgb.DaskXGBRanker:
163
def __init__(self, *, max_depth=6, learning_rate=0.3, n_estimators=100,
164
verbosity=1, objective='rank:ndcg', booster='gbtree',
165
tree_method='auto', n_jobs=None, gamma=0, min_child_weight=1,
166
max_delta_step=0, subsample=1, colsample_bytree=1,
167
colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,
168
reg_lambda=1, scale_pos_weight=1, base_score=None,
169
random_state=None, missing=float('nan'), num_parallel_tree=1,
170
monotone_constraints=None, interaction_constraints=None,
171
importance_type='gain', **kwargs):
172
"""Dask-distributed XGBoost ranker for learning-to-rank tasks."""
173
174
class dxgb.DaskXGBRFRegressor:
175
"""Dask-distributed XGBoost random forest regressor."""
176
177
class dxgb.DaskXGBRFClassifier:
178
"""Dask-distributed XGBoost random forest classifier."""
179
180
class dxgb.CommunicatorContext:
181
def __init__(self, **args):
182
"""
183
Dask-specific communicator context manager.
184
185
Parameters:
186
- **args: Arguments for communicator setup
187
"""
188
```
189
190
### Spark Integration
191
192
XGBoost integration with Apache Spark (PySpark) for large-scale distributed machine learning in Spark environments. Supports Spark DataFrames and MLlib pipeline integration.
193
194
```python { .api }
195
from xgboost import spark as spark_xgb
196
197
class spark_xgb.SparkXGBRegressor:
198
def __init__(self, *, features_col='features', label_col='label',
199
prediction_col='prediction', max_depth=6, learning_rate=0.3,
200
n_estimators=100, verbosity=1, objective=None,
201
booster='gbtree', tree_method='auto', gamma=0,
202
min_child_weight=1, max_delta_step=0, subsample=1,
203
colsample_bytree=1, colsample_bylevel=1, colsample_bynode=1,
204
reg_alpha=0, reg_lambda=1, scale_pos_weight=1,
205
base_score=None, random_state=None, missing=float('nan'),
206
num_parallel_tree=1, **kwargs):
207
"""
208
PySpark XGBoost regressor integrated with MLlib pipelines.
209
210
Parameters:
211
- features_col: Features column name (str)
212
- label_col: Label column name (str)
213
- prediction_col: Prediction column name (str)
214
- Other parameters: Same as XGBRegressor
215
"""
216
217
def fit(self, dataset):
218
"""
219
Fit the regressor on Spark DataFrame.
220
221
Parameters:
222
- dataset: Training data (pyspark.sql.DataFrame)
223
224
Returns: SparkXGBRegressorModel
225
"""
226
227
class spark_xgb.SparkXGBRegressorModel:
228
def transform(self, dataset):
229
"""
230
Transform Spark DataFrame with predictions.
231
232
Parameters:
233
- dataset: Input data (pyspark.sql.DataFrame)
234
235
Returns: pyspark.sql.DataFrame - DataFrame with predictions
236
"""
237
238
class spark_xgb.SparkXGBClassifier:
239
def __init__(self, *, features_col='features', label_col='label',
240
prediction_col='prediction', probability_col='probability',
241
raw_prediction_col='rawPrediction', max_depth=6,
242
learning_rate=0.3, n_estimators=100, verbosity=1,
243
objective=None, booster='gbtree', tree_method='auto',
244
gamma=0, min_child_weight=1, max_delta_step=0, subsample=1,
245
colsample_bytree=1, colsample_bylevel=1, colsample_bynode=1,
246
reg_alpha=0, reg_lambda=1, scale_pos_weight=1,
247
base_score=None, random_state=None, missing=float('nan'),
248
num_parallel_tree=1, **kwargs):
249
"""
250
PySpark XGBoost classifier integrated with MLlib pipelines.
251
252
Parameters:
253
- probability_col: Probability column name (str)
254
- raw_prediction_col: Raw prediction column name (str)
255
- Other parameters: Same as SparkXGBRegressor
256
"""
257
258
class spark_xgb.SparkXGBClassifierModel:
259
def transform(self, dataset):
260
"""Transform with class predictions and probabilities."""
261
262
class spark_xgb.SparkXGBRanker:
263
def __init__(self, *, features_col='features', label_col='label',
264
prediction_col='prediction', group_col=None,
265
max_depth=6, learning_rate=0.3, n_estimators=100,
266
verbosity=1, objective='rank:ndcg', booster='gbtree',
267
tree_method='auto', gamma=0, min_child_weight=1,
268
max_delta_step=0, subsample=1, colsample_bytree=1,
269
colsample_bylevel=1, colsample_bynode=1, reg_alpha=0,
270
reg_lambda=1, scale_pos_weight=1, base_score=None,
271
random_state=None, missing=float('nan'),
272
num_parallel_tree=1, **kwargs):
273
"""
274
PySpark XGBoost ranker for learning-to-rank tasks.
275
276
Parameters:
277
- group_col: Group column name for ranking (str)
278
- Other parameters: Same as SparkXGBRegressor
279
"""
280
281
class spark_xgb.SparkXGBRankerModel:
282
"""Trained Spark XGBoost ranker model."""
283
```
284
285
## Usage Examples
286
287
### Dask Distributed Training
288
289
```python
290
import dask
291
import dask.dataframe as dd
292
import dask.array as da
293
from dask.distributed import Client
294
import xgboost.dask as dxgb
295
import numpy as np
296
from sklearn.datasets import make_classification
297
298
# Start Dask client
299
client = Client('localhost:8786') # Connect to Dask scheduler
300
# Or start local cluster: client = Client(processes=True, n_workers=4, threads_per_worker=2)
301
302
# Create large dataset
303
X, y = make_classification(n_samples=100000, n_features=100, n_classes=2,
304
n_informative=50, random_state=42)
305
306
# Convert to Dask arrays for distributed processing
307
X_da = da.from_array(X, chunks=(10000, 100))
308
y_da = da.from_array(y, chunks=(10000,))
309
310
# Create DaskDMatrix
311
dtrain = dxgb.DaskDMatrix(client, X_da, y_da)
312
313
# Training parameters
314
params = {
315
'objective': 'binary:logistic',
316
'eval_metric': 'logloss',
317
'max_depth': 6,
318
'learning_rate': 0.1,
319
'subsample': 0.8,
320
'colsample_bytree': 0.8,
321
'tree_method': 'hist', # Recommended for distributed training
322
'random_state': 42
323
}
324
325
# Distributed training
326
output = dxgb.train(
327
client=client,
328
params=params,
329
dtrain=dtrain,
330
num_boost_round=100,
331
evals=[(dtrain, 'train')],
332
early_stopping_rounds=10,
333
verbose_eval=10
334
)
335
336
# Extract trained model
337
model = output['booster']
338
history = output['history']
339
340
print(f"Training completed with {len(history)} iterations")
341
342
# Distributed prediction
343
predictions = dxgb.predict(client, model, X_da)
344
print(f"Predictions shape: {predictions.shape}")
345
346
# Compute predictions (trigger computation)
347
pred_values = predictions.compute()
348
print(f"Computed predictions shape: {pred_values.shape}")
349
350
# Close client
351
client.close()
352
```
353
354
### Dask with DataFrames
355
356
```python
357
import pandas as pd
358
import dask.dataframe as dd
359
from dask.distributed import Client
360
import xgboost.dask as dxgb
361
362
# Start Dask client
363
client = Client(processes=True, n_workers=2, threads_per_worker=2)
364
365
# Create or load large dataset as Dask DataFrame
366
# In practice, you'd load from files: dd.read_csv('large_dataset.csv')
367
df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
368
df['target'] = y
369
370
# Convert to Dask DataFrame
371
ddf = dd.from_pandas(df, npartitions=8)
372
373
# Separate features and target
374
feature_cols = [f'feature_{i}' for i in range(X.shape[1])]
375
X_dask = ddf[feature_cols]
376
y_dask = ddf['target']
377
378
# Create DaskDMatrix from DataFrame
379
dtrain = dxgb.DaskDMatrix(client, X_dask, y_dask)
380
381
# Training parameters optimized for distributed training
382
params = {
383
'objective': 'binary:logistic',
384
'eval_metric': ['logloss', 'auc'],
385
'max_depth': 8,
386
'learning_rate': 0.1,
387
'subsample': 0.8,
388
'colsample_bytree': 0.8,
389
'tree_method': 'hist',
390
'max_bin': 256,
391
'random_state': 42
392
}
393
394
# Distributed training with evaluation
395
output = dxgb.train(
396
client=client,
397
params=params,
398
dtrain=dtrain,
399
num_boost_round=200,
400
evals=[(dtrain, 'train')],
401
early_stopping_rounds=20,
402
verbose_eval=25
403
)
404
405
model = output['booster']
406
407
# Distributed prediction on new data
408
test_ddf = ddf.sample(frac=0.2) # Sample for testing
409
test_X = test_ddf[feature_cols]
410
411
predictions = dxgb.predict(client, model, test_X)
412
pred_df = test_ddf.assign(predictions=predictions)
413
414
# Compute and display results
415
result = pred_df.compute()
416
print(f"Test predictions sample:\n{result[['target', 'predictions']].head()}")
417
418
client.close()
419
```
420
421
### Dask Scikit-learn Interface
422
423
```python
424
from dask.distributed import Client
425
import xgboost.dask as dxgb
426
import dask.array as da
427
428
# Start client
429
client = Client(processes=True, n_workers=4)
430
431
# Create distributed data
432
X_da = da.random.random((50000, 50), chunks=(5000, 50))
433
y_da = da.random.randint(0, 2, 50000, chunks=(5000,))
434
435
# Split data
436
train_size = int(0.8 * len(X_da))
437
X_train, X_test = X_da[:train_size], X_da[train_size:]
438
y_train, y_test = y_da[:train_size], y_da[train_size:]
439
440
# Distributed classifier
441
dask_clf = dxgb.DaskXGBClassifier(
442
objective='binary:logistic',
443
max_depth=6,
444
learning_rate=0.1,
445
n_estimators=100,
446
tree_method='hist',
447
random_state=42
448
)
449
450
# Fit with distributed data
451
dask_clf.fit(X_train, y_train,
452
eval_set=[(X_test, y_test)],
453
early_stopping_rounds=10,
454
verbose=False)
455
456
# Distributed prediction
457
y_pred = dask_clf.predict(X_test)
458
y_pred_proba = dask_clf.predict_proba(X_test)
459
460
print(f"Predictions computed: {y_pred.compute().shape}")
461
print(f"Probabilities computed: {y_pred_proba.compute().shape}")
462
463
# Feature importance (computed on the worker that has the model)
464
importance = dask_clf.feature_importances_
465
print(f"Feature importance shape: {importance.shape}")
466
467
client.close()
468
```
469
470
### Spark Distributed Training
471
472
```python
473
from pyspark.sql import SparkSession
474
from pyspark.ml import Pipeline
475
from pyspark.ml.feature import VectorAssembler
476
from pyspark.ml.evaluation import BinaryClassificationEvaluator
477
from xgboost.spark import SparkXGBClassifier
478
import numpy as np
479
import pandas as pd
480
481
# Initialize Spark session
482
spark = SparkSession.builder \
483
.appName("XGBoost Distributed Training") \
484
.config("spark.sql.adaptive.enabled", "true") \
485
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
486
.getOrCreate()
487
488
# Create sample data
489
X, y = make_classification(n_samples=100000, n_features=20, n_classes=2,
490
n_informative=15, random_state=42)
491
492
# Create pandas DataFrame
493
df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(20)])
494
df['label'] = y
495
496
# Convert to Spark DataFrame
497
spark_df = spark.createDataFrame(df)
498
499
# Prepare features using VectorAssembler
500
feature_cols = [f'feature_{i}' for i in range(20)]
501
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
502
503
# Create XGBoost classifier for Spark
504
xgb_classifier = SparkXGBClassifier(
505
features_col='features',
506
label_col='label',
507
prediction_col='prediction',
508
probability_col='probability',
509
max_depth=6,
510
learning_rate=0.1,
511
n_estimators=100,
512
subsample=0.8,
513
colsample_bytree=0.8,
514
tree_method='hist',
515
objective='binary:logistic',
516
eval_metric='logloss',
517
random_state=42
518
)
519
520
# Create ML pipeline
521
pipeline = Pipeline(stages=[assembler, xgb_classifier])
522
523
# Split data
524
train_df, test_df = spark_df.randomSplit([0.8, 0.2], seed=42)
525
526
# Train model
527
model = pipeline.fit(train_df)
528
529
# Make predictions
530
predictions = model.transform(test_df)
531
532
# Show results
533
predictions.select('label', 'prediction', 'probability').show(10, truncate=False)
534
535
# Evaluate model
536
evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
537
auc = evaluator.evaluate(predictions)
538
print(f"AUC: {auc:.4f}")
539
540
# Stop Spark
541
spark.stop()
542
```
543
544
### Distributed Cross-Validation with Dask
545
546
```python
547
from dask.distributed import Client
548
import xgboost.dask as dxgb
549
import dask.array as da
550
from sklearn.model_selection import KFold
551
import numpy as np
552
553
# Setup
554
client = Client(processes=True, n_workers=4)
555
556
# Create data
557
X_da = da.random.random((20000, 30), chunks=(2000, 30))
558
y_da = da.random.randint(0, 2, 20000, chunks=(2000,))
559
560
# Parameters for cross-validation
561
params = {
562
'objective': 'binary:logistic',
563
'eval_metric': 'auc',
564
'max_depth': 6,
565
'learning_rate': 0.1,
566
'tree_method': 'hist',
567
'random_state': 42
568
}
569
570
# Manual cross-validation with Dask
571
def distributed_cv(client, X, y, params, n_splits=5, n_rounds=100):
572
"""Perform distributed cross-validation."""
573
n_samples = len(X)
574
fold_size = n_samples // n_splits
575
scores = []
576
577
for fold in range(n_splits):
578
print(f"Training fold {fold + 1}/{n_splits}")
579
580
# Create train/validation splits
581
val_start = fold * fold_size
582
val_end = (fold + 1) * fold_size if fold < n_splits - 1 else n_samples
583
584
# Split indices
585
val_indices = list(range(val_start, val_end))
586
train_indices = list(range(0, val_start)) + list(range(val_end, n_samples))
587
588
# Create training and validation sets
589
X_train_fold = X[train_indices]
590
y_train_fold = y[train_indices]
591
X_val_fold = X[val_indices]
592
y_val_fold = y[val_indices]
593
594
# Create DMatrix objects
595
dtrain_fold = dxgb.DaskDMatrix(client, X_train_fold, y_train_fold)
596
dval_fold = dxgb.DaskDMatrix(client, X_val_fold, y_val_fold)
597
598
# Train model
599
output = dxgb.train(
600
client=client,
601
params=params,
602
dtrain=dtrain_fold,
603
num_boost_round=n_rounds,
604
evals=[(dtrain_fold, 'train'), (dval_fold, 'val')],
605
early_stopping_rounds=10,
606
verbose_eval=False
607
)
608
609
# Get best score
610
model = output['booster']
611
val_score = model.best_score
612
scores.append(val_score)
613
print(f"Fold {fold + 1} AUC: {val_score:.4f}")
614
615
return scores
616
617
# Run distributed CV
618
cv_scores = distributed_cv(client, X_da, y_da, params, n_splits=5)
619
620
print(f"\nCross-validation results:")
621
print(f"Mean AUC: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")
622
print(f"Individual scores: {[f'{score:.4f}' for score in cv_scores]}")
623
624
client.close()
625
```
626
627
### Large-Scale Feature Engineering with Dask
628
629
```python
630
from dask.distributed import Client
631
import dask.dataframe as dd
632
import dask.array as da
633
import xgboost.dask as dxgb
634
import pandas as pd
635
636
# Setup
637
client = Client(processes=True, n_workers=4, memory_limit='2GB')
638
639
# Simulate large dataset loading
640
# In practice: dd.read_csv('huge_dataset.csv', blocksize='100MB')
641
large_df = pd.DataFrame({
642
'feature_1': np.random.randn(1000000),
643
'feature_2': np.random.randn(1000000),
644
'feature_3': np.random.randn(1000000),
645
'categorical_1': np.random.choice(['A', 'B', 'C', 'D'], 1000000),
646
'categorical_2': np.random.choice(['X', 'Y', 'Z'], 1000000),
647
'target': np.random.randint(0, 2, 1000000)
648
})
649
650
# Convert to Dask DataFrame with appropriate partitioning
651
ddf = dd.from_pandas(large_df, npartitions=100)
652
653
# Distributed feature engineering
654
def engineer_features(df):
655
"""Apply feature engineering transformations."""
656
# Numerical transformations
657
df['feature_1_squared'] = df['feature_1'] ** 2
658
df['feature_2_log'] = dd.log(dd.abs(df['feature_2']) + 1)
659
df['feature_interaction'] = df['feature_1'] * df['feature_2']
660
661
# Categorical encoding (simple label encoding for demo)
662
df['cat_1_encoded'] = df['categorical_1'].map({'A': 0, 'B': 1, 'C': 2, 'D': 3})
663
df['cat_2_encoded'] = df['categorical_2'].map({'X': 0, 'Y': 1, 'Z': 2})
664
665
return df
666
667
# Apply feature engineering
668
engineered_df = engineer_features(ddf)
669
670
# Select features for training
671
feature_cols = ['feature_1', 'feature_2', 'feature_3', 'feature_1_squared',
672
'feature_2_log', 'feature_interaction', 'cat_1_encoded', 'cat_2_encoded']
673
674
X_features = engineered_df[feature_cols]
675
y_target = engineered_df['target']
676
677
# Create train/test split
678
n_samples = len(engineered_df)
679
train_size = int(0.8 * n_samples)
680
681
X_train = X_features.iloc[:train_size]
682
X_test = X_features.iloc[train_size:]
683
y_train = y_target.iloc[:train_size]
684
y_test = y_target.iloc[train_size:]
685
686
# Create DaskDMatrix
687
dtrain = dxgb.DaskDMatrix(client, X_train, y_train)
688
dtest = dxgb.DaskDMatrix(client, X_test, y_test)
689
690
# Training parameters
691
params = {
692
'objective': 'binary:logistic',
693
'eval_metric': ['logloss', 'auc'],
694
'max_depth': 8,
695
'learning_rate': 0.1,
696
'subsample': 0.8,
697
'colsample_bytree': 0.8,
698
'tree_method': 'hist',
699
'max_bin': 256,
700
'random_state': 42
701
}
702
703
# Distributed training
704
print("Starting distributed training on engineered features...")
705
output = dxgb.train(
706
client=client,
707
params=params,
708
dtrain=dtrain,
709
num_boost_round=100,
710
evals=[(dtrain, 'train'), (dtest, 'test')],
711
early_stopping_rounds=10,
712
verbose_eval=10
713
)
714
715
model = output['booster']
716
print(f"Training completed at iteration {model.best_iteration}")
717
print(f"Best test score: {model.best_score:.4f}")
718
719
# Feature importance analysis
720
feature_importance = model.get_score(importance_type='gain')
721
print(f"\nTop 5 most important features:")
722
sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
723
for feature, importance in sorted_features[:5]:
724
print(f"{feature}: {importance:.4f}")
725
726
client.close()
727
```