0
# Distributed Computing
1
2
Distributed training and prediction using Dask for scalable machine learning across multiple machines. LightGBM's Dask integration provides all the functionality of standard LightGBM models with automatic data distribution and parallel processing capabilities.
3
4
## Capabilities
5
6
### Distributed Regression
7
8
Distributed version of LGBMRegressor that can handle large datasets split across multiple Dask workers.
9
10
```python { .api }
11
class DaskLGBMRegressor:
12
"""
13
Distributed LightGBM regressor using Dask for scalable regression tasks.
14
15
All parameters from LGBMRegressor are supported, plus Dask-specific options.
16
"""
17
18
def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,
19
learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,
20
objective=None, class_weight=None, min_split_gain=0.,
21
min_child_weight=1e-3, min_child_samples=20, subsample=1.,
22
subsample_freq=0, colsample_bytree=1., reg_alpha=0.,
23
reg_lambda=0., random_state=None, n_jobs=None,
24
importance_type='split', client=None, **kwargs):
25
"""
26
Initialize DaskLGBMRegressor.
27
28
Parameters inherit from LGBMRegressor, plus:
29
- client: dask.distributed.Client or None - Dask client for distributed computing
30
"""
31
32
def fit(self, X, y, sample_weight=None, init_score=None, eval_set=None,
33
eval_names=None, eval_sample_weight=None, eval_init_score=None,
34
eval_metric=None, feature_name='auto', categorical_feature='auto',
35
early_stopping_rounds=None, verbose=True, log_evaluation=None,
36
callbacks=None, client=None, **kwargs):
37
"""
38
Fit distributed regression model.
39
40
Parameters:
41
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed training features
42
- y: dask.array.Array or dask.dataframe.Series - Distributed training targets
43
- sample_weight: dask.array.Array or None - Distributed sample weights
44
- init_score: dask.array.Array or None - Distributed initial scores
45
- eval_set: list of (X, y) tuples with Dask collections - Distributed validation sets
46
- eval_names: list of strings - Names for evaluation sets
47
- eval_sample_weight: list of dask arrays - Sample weights for evaluation sets
48
- eval_init_score: list of dask arrays - Initial scores for evaluation sets
49
- eval_metric: str, list of str, or None - Evaluation metrics
50
- feature_name: list of strings or 'auto' - Feature names
51
- categorical_feature: list of strings/ints or 'auto' - Categorical features
52
- early_stopping_rounds: int or None - Early stopping rounds
53
- verbose: bool or int - Controls verbosity
54
- log_evaluation: bool, int, or None - Evaluation logging frequency
55
- callbacks: list of callback functions - Custom callbacks
56
- client: dask.distributed.Client or None - Dask client
57
58
Returns:
59
- self: Returns self
60
"""
61
62
def predict(self, X, num_iteration=None, client=None, **kwargs):
63
"""
64
Make distributed predictions.
65
66
Parameters:
67
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
68
- num_iteration: int or None - Limit number of iterations for prediction
69
- client: dask.distributed.Client or None - Dask client
70
71
Returns:
72
- dask.array.Array: Distributed prediction results
73
"""
74
75
def score(self, X, y, sample_weight=None, client=None):
76
"""
77
Return distributed R² coefficient of determination.
78
79
Parameters:
80
- X: dask.array.Array or dask.dataframe.DataFrame - Test samples
81
- y: dask.array.Array or dask.dataframe.Series - True values for X
82
- sample_weight: dask.array.Array or None - Sample weights
83
- client: dask.distributed.Client or None - Dask client
84
85
Returns:
86
- float: R² score computed across distributed data
87
"""
88
```
89
90
### Distributed Classification
91
92
Distributed version of LGBMClassifier supporting both binary and multiclass classification across multiple workers.
93
94
```python { .api }
95
class DaskLGBMClassifier:
96
"""
97
Distributed LightGBM classifier using Dask for scalable classification tasks.
98
99
All parameters from LGBMClassifier are supported, plus Dask-specific options.
100
"""
101
102
def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,
103
learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,
104
objective=None, class_weight=None, min_split_gain=0.,
105
min_child_weight=1e-3, min_child_samples=20, subsample=1.,
106
subsample_freq=0, colsample_bytree=1., reg_alpha=0.,
107
reg_lambda=0., random_state=None, n_jobs=None,
108
importance_type='split', client=None, **kwargs):
109
"""
110
Initialize DaskLGBMClassifier.
111
112
Parameters inherit from LGBMClassifier, plus:
113
- client: dask.distributed.Client or None - Dask client for distributed computing
114
"""
115
116
def fit(self, X, y, sample_weight=None, init_score=None, eval_set=None,
117
eval_names=None, eval_sample_weight=None, eval_init_score=None,
118
eval_metric=None, feature_name='auto', categorical_feature='auto',
119
early_stopping_rounds=None, verbose=True, log_evaluation=None,
120
callbacks=None, client=None, **kwargs):
121
"""
122
Fit distributed classification model.
123
124
Parameters follow same pattern as DaskLGBMRegressor.fit() but for classification data.
125
126
Returns:
127
- self: Returns self
128
"""
129
130
def predict(self, X, num_iteration=None, client=None, **kwargs):
131
"""
132
Make distributed class predictions.
133
134
Parameters:
135
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
136
- num_iteration: int or None - Limit number of iterations for prediction
137
- client: dask.distributed.Client or None - Dask client
138
139
Returns:
140
- dask.array.Array: Distributed class predictions
141
"""
142
143
def predict_proba(self, X, num_iteration=None, client=None, **kwargs):
144
"""
145
Make distributed probability predictions.
146
147
Parameters:
148
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
149
- num_iteration: int or None - Limit number of iterations for prediction
150
- client: dask.distributed.Client or None - Dask client
151
152
Returns:
153
- dask.array.Array: Distributed class probabilities, shape (n_samples, n_classes)
154
"""
155
156
def score(self, X, y, sample_weight=None, client=None):
157
"""
158
Return distributed classification accuracy.
159
160
Parameters:
161
- X: dask.array.Array or dask.dataframe.DataFrame - Test samples
162
- y: dask.array.Array or dask.dataframe.Series - True labels for X
163
- sample_weight: dask.array.Array or None - Sample weights
164
- client: dask.distributed.Client or None - Dask client
165
166
Returns:
167
- float: Accuracy score computed across distributed data
168
"""
169
170
@property
171
def classes_(self):
172
"""Get unique class labels."""
173
174
@property
175
def n_classes_(self):
176
"""Get number of classes."""
177
```
178
179
### Distributed Ranking
180
181
Distributed version of LGBMRanker for learning-to-rank tasks on large-scale datasets.
182
183
```python { .api }
184
class DaskLGBMRanker:
185
"""
186
Distributed LightGBM ranker using Dask for scalable ranking tasks.
187
188
All parameters from LGBMRanker are supported, plus Dask-specific options.
189
"""
190
191
def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,
192
learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,
193
objective=None, class_weight=None, min_split_gain=0.,
194
min_child_weight=1e-3, min_child_samples=20, subsample=1.,
195
subsample_freq=0, colsample_bytree=1., reg_alpha=0.,
196
reg_lambda=0., random_state=None, n_jobs=None,
197
importance_type='split', client=None, **kwargs):
198
"""
199
Initialize DaskLGBMRanker.
200
201
Parameters inherit from LGBMRanker, plus:
202
- client: dask.distributed.Client or None - Dask client for distributed computing
203
"""
204
205
def fit(self, X, y, group=None, sample_weight=None, init_score=None,
206
eval_set=None, eval_names=None, eval_sample_weight=None,
207
eval_init_score=None, eval_group=None, eval_metric=None,
208
feature_name='auto', categorical_feature='auto',
209
early_stopping_rounds=None, verbose=True, log_evaluation=None,
210
callbacks=None, client=None, **kwargs):
211
"""
212
Fit distributed ranking model.
213
214
Parameters:
215
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed training features
216
- y: dask.array.Array or dask.dataframe.Series - Distributed ranking scores
217
- group: dask.array.Array - Distributed group/query sizes for ranking
218
- sample_weight: dask.array.Array or None - Distributed sample weights
219
- init_score: dask.array.Array or None - Distributed initial scores
220
- eval_set: list of (X, y) tuples with Dask collections - Distributed validation sets
221
- eval_names: list of strings - Names for evaluation sets
222
- eval_sample_weight: list of dask arrays - Sample weights for evaluation sets
223
- eval_init_score: list of dask arrays - Initial scores for evaluation sets
224
- eval_group: list of dask arrays - Group sizes for evaluation sets
225
- eval_metric: str, list of str, or None - Evaluation metrics
226
- feature_name: list of strings or 'auto' - Feature names
227
- categorical_feature: list of strings/ints or 'auto' - Categorical features
228
- early_stopping_rounds: int or None - Early stopping rounds
229
- verbose: bool or int - Controls verbosity
230
- log_evaluation: bool, int, or None - Evaluation logging frequency
231
- callbacks: list of callback functions - Custom callbacks
232
- client: dask.distributed.Client or None - Dask client
233
234
Returns:
235
- self: Returns self
236
"""
237
238
def predict(self, X, num_iteration=None, client=None, **kwargs):
239
"""
240
Make distributed ranking predictions.
241
242
Parameters:
243
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
244
- num_iteration: int or None - Limit number of iterations for prediction
245
- client: dask.distributed.Client or None - Dask client
246
247
Returns:
248
- dask.array.Array: Distributed ranking scores
249
"""
250
251
def score(self, X, y, sample_weight=None, client=None):
252
"""
253
Return distributed ranking evaluation score.
254
255
Parameters:
256
- X: dask.array.Array or dask.dataframe.DataFrame - Test samples
257
- y: dask.array.Array or dask.dataframe.Series - True ranking scores
258
- sample_weight: dask.array.Array or None - Sample weights
259
- client: dask.distributed.Client or None - Dask client
260
261
Returns:
262
- float: Ranking score computed across distributed data
263
"""
264
```
265
266
## Usage Examples
267
268
### Distributed Regression Example
269
270
```python
271
import dask.array as da
272
import dask.dataframe as dd
273
from dask.distributed import Client
274
import lightgbm as lgb
275
import numpy as np
276
277
# Setup Dask client
278
client = Client('localhost:8786') # Connect to Dask scheduler
279
280
# Create large distributed dataset
281
n_samples = 1_000_000
282
n_features = 100
283
284
# Generate data in chunks
285
X = da.random.random((n_samples, n_features), chunks=(10000, n_features))
286
y = da.random.random(n_samples, chunks=10000)
287
288
# Split into train/test
289
train_size = int(0.8 * n_samples)
290
X_train = X[:train_size]
291
X_test = X[train_size:]
292
y_train = y[:train_size]
293
y_test = y[train_size:]
294
295
# Initialize distributed regressor
296
regressor = lgb.DaskLGBMRegressor(
297
objective='regression',
298
n_estimators=100,
299
learning_rate=0.1,
300
num_leaves=31,
301
client=client
302
)
303
304
# Fit model on distributed data
305
regressor.fit(
306
X_train, y_train,
307
eval_set=[(X_test, y_test)],
308
eval_names=['test'],
309
early_stopping_rounds=10,
310
verbose=True
311
)
312
313
# Make distributed predictions
314
predictions = regressor.predict(X_test)
315
316
# Compute and collect results
317
r2_score = regressor.score(X_test, y_test)
318
print(f"Distributed R² Score: {r2_score:.4f}")
319
320
# Close client
321
client.close()
322
```
323
324
### Distributed Classification Example
325
326
```python
327
import dask.array as da
328
import dask.dataframe as dd
329
from dask.distributed import Client
330
import lightgbm as lgb
331
import numpy as np
332
333
# Setup Dask client with multiple workers
334
client = Client('localhost:8786')
335
336
# Create distributed classification data
337
n_samples = 500_000
338
n_features = 50
339
n_classes = 3
340
341
X = da.random.random((n_samples, n_features), chunks=(5000, n_features))
342
y = da.random.randint(0, n_classes, size=n_samples, chunks=5000)
343
344
# Convert to Dask DataFrame for better handling
345
df = dd.from_dask_array(X, columns=[f'feature_{i}' for i in range(n_features)])
346
df['target'] = y
347
348
# Split data
349
train_df = df.iloc[:int(0.8 * n_samples)]
350
test_df = df.iloc[int(0.8 * n_samples):]
351
352
X_train = train_df.drop('target', axis=1)
353
y_train = train_df['target']
354
X_test = test_df.drop('target', axis=1)
355
y_test = test_df['target']
356
357
# Initialize distributed classifier
358
classifier = lgb.DaskLGBMClassifier(
359
objective='multiclass',
360
num_class=n_classes,
361
n_estimators=100,
362
learning_rate=0.1,
363
num_leaves=31,
364
client=client
365
)
366
367
# Fit model
368
classifier.fit(
369
X_train, y_train,
370
eval_set=[(X_test, y_test)],
371
eval_names=['test'],
372
eval_metric='multi_logloss',
373
early_stopping_rounds=10,
374
verbose=True
375
)
376
377
# Make predictions
378
class_predictions = classifier.predict(X_test)
379
class_probabilities = classifier.predict_proba(X_test)
380
381
# Evaluate
382
accuracy = classifier.score(X_test, y_test)
383
print(f"Distributed Accuracy: {accuracy:.4f}")
384
print(f"Number of classes: {classifier.n_classes_}")
385
print(f"Class labels: {classifier.classes_}")
386
387
client.close()
388
```
389
390
### Distributed Ranking Example
391
392
```python
393
import dask.array as da
394
from dask.distributed import Client
395
import lightgbm as lgb
396
import numpy as np
397
398
# Setup Dask client
399
client = Client('localhost:8786')
400
401
# Create distributed ranking data
402
n_samples = 100_000
403
n_features = 20
404
n_queries = 1000
405
406
X = da.random.random((n_samples, n_features), chunks=(1000, n_features))
407
y = da.random.random(n_samples, chunks=1000) # Relevance scores
408
409
# Create group sizes for ranking (distributed)
410
# Each group represents a query with varying number of documents
411
query_sizes = da.random.randint(50, 150, size=n_queries, chunks=100)
412
# Ensure total doesn't exceed n_samples
413
query_sizes = query_sizes[query_sizes.cumsum() <= n_samples]
414
415
# Initialize distributed ranker
416
ranker = lgb.DaskLGBMRanker(
417
objective='rank_xendcg',
418
n_estimators=100,
419
learning_rate=0.1,
420
num_leaves=31,
421
client=client
422
)
423
424
# Fit ranking model
425
ranker.fit(X, y, group=query_sizes)
426
427
# Make ranking predictions
428
ranking_scores = ranker.predict(X)
429
430
print(f"Ranking scores computed for {n_samples} samples")
431
print(f"Sample ranking scores: {ranking_scores[:10].compute()}")
432
433
client.close()
434
```
435
436
### Advanced Distributed Setup
437
438
```python
439
from dask.distributed import Client, LocalCluster
440
import lightgbm as lgb
441
import dask.array as da
442
443
# Create local cluster with specific configuration
444
cluster = LocalCluster(
445
n_workers=4,
446
threads_per_worker=2,
447
memory_limit='4GB',
448
dashboard_address=':8787'
449
)
450
client = Client(cluster)
451
452
# Create large dataset that wouldn't fit in memory
453
X = da.random.random((10_000_000, 100), chunks=(50_000, 100))
454
y = da.random.random(10_000_000, chunks=50_000)
455
456
# Use distributed regressor with advanced parameters
457
regressor = lgb.DaskLGBMRegressor(
458
objective='regression',
459
n_estimators=200,
460
learning_rate=0.05,
461
num_leaves=63,
462
max_depth=7,
463
min_child_samples=100,
464
subsample=0.8,
465
colsample_bytree=0.8,
466
reg_alpha=0.1,
467
reg_lambda=0.1,
468
n_jobs=-1, # Use all available cores on each worker
469
client=client
470
)
471
472
# Fit with validation and callbacks
473
regressor.fit(
474
X, y,
475
eval_set=[(X[-100000:], y[-100000:])], # Use last chunk for validation
476
eval_names=['validation'],
477
eval_metric='rmse',
478
early_stopping_rounds=20,
479
verbose=True,
480
callbacks=[
481
lgb.log_evaluation(10),
482
lgb.record_evaluation({})
483
]
484
)
485
486
# Get feature importance (computed across all workers)
487
importance = regressor.feature_importances_
488
print(f"Top 5 feature importances: {importance[:5]}")
489
490
# Clean up
491
client.close()
492
cluster.close()
493
```
494
495
## Performance Considerations
496
497
### Data Chunking
498
499
- Choose chunk sizes that balance memory usage and computation efficiency
500
- Typical chunk sizes: 10,000-100,000 samples per chunk
501
- Consider network bandwidth when chunks are transferred between workers
502
503
### Worker Configuration
504
505
- Allocate sufficient memory per worker to handle chunk processing
506
- Use multiple threads per worker for CPU-intensive operations
507
- Consider I/O bandwidth when reading distributed data
508
509
### Network Optimization
510
511
- Minimize data movement by co-locating related data chunks
512
- Use compression for network transfers when bandwidth is limited
513
- Consider data locality when scheduling computations