or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-training.mddistributed-computing.mdindex.mdsklearn-interface.mdtraining-callbacks.mdvisualization.md

distributed-computing.mddocs/

0

# Distributed Computing

1

2

Distributed training and prediction using Dask for scalable machine learning across multiple machines. LightGBM's Dask integration provides all the functionality of standard LightGBM models with automatic data distribution and parallel processing capabilities.

3

4

## Capabilities

5

6

### Distributed Regression

7

8

Distributed version of LGBMRegressor that can handle large datasets split across multiple Dask workers.

9

10

```python { .api }

11

class DaskLGBMRegressor:

12

"""

13

Distributed LightGBM regressor using Dask for scalable regression tasks.

14

15

All parameters from LGBMRegressor are supported, plus Dask-specific options.

16

"""

17

18

def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,

19

learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,

20

objective=None, class_weight=None, min_split_gain=0.,

21

min_child_weight=1e-3, min_child_samples=20, subsample=1.,

22

subsample_freq=0, colsample_bytree=1., reg_alpha=0.,

23

reg_lambda=0., random_state=None, n_jobs=None,

24

importance_type='split', client=None, **kwargs):

25

"""

26

Initialize DaskLGBMRegressor.

27

28

Parameters inherit from LGBMRegressor, plus:

29

- client: dask.distributed.Client or None - Dask client for distributed computing

30

"""

31

32

def fit(self, X, y, sample_weight=None, init_score=None, eval_set=None,

33

eval_names=None, eval_sample_weight=None, eval_init_score=None,

34

eval_metric=None, feature_name='auto', categorical_feature='auto',

35

early_stopping_rounds=None, verbose=True, log_evaluation=None,

36

callbacks=None, client=None, **kwargs):

37

"""

38

Fit distributed regression model.

39

40

Parameters:

41

- X: dask.array.Array or dask.dataframe.DataFrame - Distributed training features

42

- y: dask.array.Array or dask.dataframe.Series - Distributed training targets

43

- sample_weight: dask.array.Array or None - Distributed sample weights

44

- init_score: dask.array.Array or None - Distributed initial scores

45

- eval_set: list of (X, y) tuples with Dask collections - Distributed validation sets

46

- eval_names: list of strings - Names for evaluation sets

47

- eval_sample_weight: list of dask arrays - Sample weights for evaluation sets

48

- eval_init_score: list of dask arrays - Initial scores for evaluation sets

49

- eval_metric: str, list of str, or None - Evaluation metrics

50

- feature_name: list of strings or 'auto' - Feature names

51

- categorical_feature: list of strings/ints or 'auto' - Categorical features

52

- early_stopping_rounds: int or None - Early stopping rounds

53

- verbose: bool or int - Controls verbosity

54

- log_evaluation: bool, int, or None - Evaluation logging frequency

55

- callbacks: list of callback functions - Custom callbacks

56

- client: dask.distributed.Client or None - Dask client

57

58

Returns:

59

- self: Returns self

60

"""

61

62

def predict(self, X, num_iteration=None, client=None, **kwargs):

63

"""

64

Make distributed predictions.

65

66

Parameters:

67

- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features

68

- num_iteration: int or None - Limit number of iterations for prediction

69

- client: dask.distributed.Client or None - Dask client

70

71

Returns:

72

- dask.array.Array: Distributed prediction results

73

"""

74

75

def score(self, X, y, sample_weight=None, client=None):

76

"""

77

Return distributed R² coefficient of determination.

78

79

Parameters:

80

- X: dask.array.Array or dask.dataframe.DataFrame - Test samples

81

- y: dask.array.Array or dask.dataframe.Series - True values for X

82

- sample_weight: dask.array.Array or None - Sample weights

83

- client: dask.distributed.Client or None - Dask client

84

85

Returns:

86

- float: R² score computed across distributed data

87

"""

88

```

89

90

### Distributed Classification

91

92

Distributed version of LGBMClassifier supporting both binary and multiclass classification across multiple workers.

93

94

```python { .api }

95

class DaskLGBMClassifier:

96

"""

97

Distributed LightGBM classifier using Dask for scalable classification tasks.

98

99

All parameters from LGBMClassifier are supported, plus Dask-specific options.

100

"""

101

102

def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,

103

learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,

104

objective=None, class_weight=None, min_split_gain=0.,

105

min_child_weight=1e-3, min_child_samples=20, subsample=1.,

106

subsample_freq=0, colsample_bytree=1., reg_alpha=0.,

107

reg_lambda=0., random_state=None, n_jobs=None,

108

importance_type='split', client=None, **kwargs):

109

"""

110

Initialize DaskLGBMClassifier.

111

112

Parameters inherit from LGBMClassifier, plus:

113

- client: dask.distributed.Client or None - Dask client for distributed computing

114

"""

115

116

def fit(self, X, y, sample_weight=None, init_score=None, eval_set=None,

117

eval_names=None, eval_sample_weight=None, eval_init_score=None,

118

eval_metric=None, feature_name='auto', categorical_feature='auto',

119

early_stopping_rounds=None, verbose=True, log_evaluation=None,

120

callbacks=None, client=None, **kwargs):

121

"""

122

Fit distributed classification model.

123

124

Parameters follow same pattern as DaskLGBMRegressor.fit() but for classification data.

125

126

Returns:

127

- self: Returns self

128

"""

129

130

def predict(self, X, num_iteration=None, client=None, **kwargs):

131

"""

132

Make distributed class predictions.

133

134

Parameters:

135

- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features

136

- num_iteration: int or None - Limit number of iterations for prediction

137

- client: dask.distributed.Client or None - Dask client

138

139

Returns:

140

- dask.array.Array: Distributed class predictions

141

"""

142

143

def predict_proba(self, X, num_iteration=None, client=None, **kwargs):

144

"""

145

Make distributed probability predictions.

146

147

Parameters:

148

- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features

149

- num_iteration: int or None - Limit number of iterations for prediction

150

- client: dask.distributed.Client or None - Dask client

151

152

Returns:

153

- dask.array.Array: Distributed class probabilities, shape (n_samples, n_classes)

154

"""

155

156

def score(self, X, y, sample_weight=None, client=None):

157

"""

158

Return distributed classification accuracy.

159

160

Parameters:

161

- X: dask.array.Array or dask.dataframe.DataFrame - Test samples

162

- y: dask.array.Array or dask.dataframe.Series - True labels for X

163

- sample_weight: dask.array.Array or None - Sample weights

164

- client: dask.distributed.Client or None - Dask client

165

166

Returns:

167

- float: Accuracy score computed across distributed data

168

"""

169

170

@property

171

def classes_(self):

172

"""Get unique class labels."""

173

174

@property

175

def n_classes_(self):

176

"""Get number of classes."""

177

```

178

179

### Distributed Ranking

180

181

Distributed version of LGBMRanker for learning-to-rank tasks on large-scale datasets.

182

183

```python { .api }

184

class DaskLGBMRanker:

185

"""

186

Distributed LightGBM ranker using Dask for scalable ranking tasks.

187

188

All parameters from LGBMRanker are supported, plus Dask-specific options.

189

"""

190

191

def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,

192

learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,

193

objective=None, class_weight=None, min_split_gain=0.,

194

min_child_weight=1e-3, min_child_samples=20, subsample=1.,

195

subsample_freq=0, colsample_bytree=1., reg_alpha=0.,

196

reg_lambda=0., random_state=None, n_jobs=None,

197

importance_type='split', client=None, **kwargs):

198

"""

199

Initialize DaskLGBMRanker.

200

201

Parameters inherit from LGBMRanker, plus:

202

- client: dask.distributed.Client or None - Dask client for distributed computing

203

"""

204

205

def fit(self, X, y, group=None, sample_weight=None, init_score=None,

206

eval_set=None, eval_names=None, eval_sample_weight=None,

207

eval_init_score=None, eval_group=None, eval_metric=None,

208

feature_name='auto', categorical_feature='auto',

209

early_stopping_rounds=None, verbose=True, log_evaluation=None,

210

callbacks=None, client=None, **kwargs):

211

"""

212

Fit distributed ranking model.

213

214

Parameters:

215

- X: dask.array.Array or dask.dataframe.DataFrame - Distributed training features

216

- y: dask.array.Array or dask.dataframe.Series - Distributed ranking scores

217

- group: dask.array.Array - Distributed group/query sizes for ranking

218

- sample_weight: dask.array.Array or None - Distributed sample weights

219

- init_score: dask.array.Array or None - Distributed initial scores

220

- eval_set: list of (X, y) tuples with Dask collections - Distributed validation sets

221

- eval_names: list of strings - Names for evaluation sets

222

- eval_sample_weight: list of dask arrays - Sample weights for evaluation sets

223

- eval_init_score: list of dask arrays - Initial scores for evaluation sets

224

- eval_group: list of dask arrays - Group sizes for evaluation sets

225

- eval_metric: str, list of str, or None - Evaluation metrics

226

- feature_name: list of strings or 'auto' - Feature names

227

- categorical_feature: list of strings/ints or 'auto' - Categorical features

228

- early_stopping_rounds: int or None - Early stopping rounds

229

- verbose: bool or int - Controls verbosity

230

- log_evaluation: bool, int, or None - Evaluation logging frequency

231

- callbacks: list of callback functions - Custom callbacks

232

- client: dask.distributed.Client or None - Dask client

233

234

Returns:

235

- self: Returns self

236

"""

237

238

def predict(self, X, num_iteration=None, client=None, **kwargs):

239

"""

240

Make distributed ranking predictions.

241

242

Parameters:

243

- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features

244

- num_iteration: int or None - Limit number of iterations for prediction

245

- client: dask.distributed.Client or None - Dask client

246

247

Returns:

248

- dask.array.Array: Distributed ranking scores

249

"""

250

251

def score(self, X, y, sample_weight=None, client=None):

252

"""

253

Return distributed ranking evaluation score.

254

255

Parameters:

256

- X: dask.array.Array or dask.dataframe.DataFrame - Test samples

257

- y: dask.array.Array or dask.dataframe.Series - True ranking scores

258

- sample_weight: dask.array.Array or None - Sample weights

259

- client: dask.distributed.Client or None - Dask client

260

261

Returns:

262

- float: Ranking score computed across distributed data

263

"""

264

```

265

266

## Usage Examples

267

268

### Distributed Regression Example

269

270

```python

271

import dask.array as da

272

import dask.dataframe as dd

273

from dask.distributed import Client

274

import lightgbm as lgb

275

import numpy as np

276

277

# Setup Dask client

278

client = Client('localhost:8786') # Connect to Dask scheduler

279

280

# Create large distributed dataset

281

n_samples = 1_000_000

282

n_features = 100

283

284

# Generate data in chunks

285

X = da.random.random((n_samples, n_features), chunks=(10000, n_features))

286

y = da.random.random(n_samples, chunks=10000)

287

288

# Split into train/test

289

train_size = int(0.8 * n_samples)

290

X_train = X[:train_size]

291

X_test = X[train_size:]

292

y_train = y[:train_size]

293

y_test = y[train_size:]

294

295

# Initialize distributed regressor

296

regressor = lgb.DaskLGBMRegressor(

297

objective='regression',

298

n_estimators=100,

299

learning_rate=0.1,

300

num_leaves=31,

301

client=client

302

)

303

304

# Fit model on distributed data

305

regressor.fit(

306

X_train, y_train,

307

eval_set=[(X_test, y_test)],

308

eval_names=['test'],

309

early_stopping_rounds=10,

310

verbose=True

311

)

312

313

# Make distributed predictions

314

predictions = regressor.predict(X_test)

315

316

# Compute and collect results

317

r2_score = regressor.score(X_test, y_test)

318

print(f"Distributed R² Score: {r2_score:.4f}")

319

320

# Close client

321

client.close()

322

```

323

324

### Distributed Classification Example

325

326

```python

327

import dask.array as da

328

import dask.dataframe as dd

329

from dask.distributed import Client

330

import lightgbm as lgb

331

import numpy as np

332

333

# Setup Dask client with multiple workers

334

client = Client('localhost:8786')

335

336

# Create distributed classification data

337

n_samples = 500_000

338

n_features = 50

339

n_classes = 3

340

341

X = da.random.random((n_samples, n_features), chunks=(5000, n_features))

342

y = da.random.randint(0, n_classes, size=n_samples, chunks=5000)

343

344

# Convert to Dask DataFrame for better handling

345

df = dd.from_dask_array(X, columns=[f'feature_{i}' for i in range(n_features)])

346

df['target'] = y

347

348

# Split data

349

train_df = df.iloc[:int(0.8 * n_samples)]

350

test_df = df.iloc[int(0.8 * n_samples):]

351

352

X_train = train_df.drop('target', axis=1)

353

y_train = train_df['target']

354

X_test = test_df.drop('target', axis=1)

355

y_test = test_df['target']

356

357

# Initialize distributed classifier

358

classifier = lgb.DaskLGBMClassifier(

359

objective='multiclass',

360

num_class=n_classes,

361

n_estimators=100,

362

learning_rate=0.1,

363

num_leaves=31,

364

client=client

365

)

366

367

# Fit model

368

classifier.fit(

369

X_train, y_train,

370

eval_set=[(X_test, y_test)],

371

eval_names=['test'],

372

eval_metric='multi_logloss',

373

early_stopping_rounds=10,

374

verbose=True

375

)

376

377

# Make predictions

378

class_predictions = classifier.predict(X_test)

379

class_probabilities = classifier.predict_proba(X_test)

380

381

# Evaluate

382

accuracy = classifier.score(X_test, y_test)

383

print(f"Distributed Accuracy: {accuracy:.4f}")

384

print(f"Number of classes: {classifier.n_classes_}")

385

print(f"Class labels: {classifier.classes_}")

386

387

client.close()

388

```

389

390

### Distributed Ranking Example

391

392

```python

393

import dask.array as da

394

from dask.distributed import Client

395

import lightgbm as lgb

396

import numpy as np

397

398

# Setup Dask client

399

client = Client('localhost:8786')

400

401

# Create distributed ranking data

402

n_samples = 100_000

403

n_features = 20

404

n_queries = 1000

405

406

X = da.random.random((n_samples, n_features), chunks=(1000, n_features))

407

y = da.random.random(n_samples, chunks=1000) # Relevance scores

408

409

# Create group sizes for ranking (distributed)

410

# Each group represents a query with varying number of documents

411

query_sizes = da.random.randint(50, 150, size=n_queries, chunks=100)

412

# Ensure total doesn't exceed n_samples

413

query_sizes = query_sizes[query_sizes.cumsum() <= n_samples]

414

415

# Initialize distributed ranker

416

ranker = lgb.DaskLGBMRanker(

417

objective='rank_xendcg',

418

n_estimators=100,

419

learning_rate=0.1,

420

num_leaves=31,

421

client=client

422

)

423

424

# Fit ranking model

425

ranker.fit(X, y, group=query_sizes)

426

427

# Make ranking predictions

428

ranking_scores = ranker.predict(X)

429

430

print(f"Ranking scores computed for {n_samples} samples")

431

print(f"Sample ranking scores: {ranking_scores[:10].compute()}")

432

433

client.close()

434

```

435

436

### Advanced Distributed Setup

437

438

```python

439

from dask.distributed import Client, LocalCluster

440

import lightgbm as lgb

441

import dask.array as da

442

443

# Create local cluster with specific configuration

444

cluster = LocalCluster(

445

n_workers=4,

446

threads_per_worker=2,

447

memory_limit='4GB',

448

dashboard_address=':8787'

449

)

450

client = Client(cluster)

451

452

# Create large dataset that wouldn't fit in memory

453

X = da.random.random((10_000_000, 100), chunks=(50_000, 100))

454

y = da.random.random(10_000_000, chunks=50_000)

455

456

# Use distributed regressor with advanced parameters

457

regressor = lgb.DaskLGBMRegressor(

458

objective='regression',

459

n_estimators=200,

460

learning_rate=0.05,

461

num_leaves=63,

462

max_depth=7,

463

min_child_samples=100,

464

subsample=0.8,

465

colsample_bytree=0.8,

466

reg_alpha=0.1,

467

reg_lambda=0.1,

468

n_jobs=-1, # Use all available cores on each worker

469

client=client

470

)

471

472

# Fit with validation and callbacks

473

regressor.fit(

474

X, y,

475

eval_set=[(X[-100000:], y[-100000:])], # Use last chunk for validation

476

eval_names=['validation'],

477

eval_metric='rmse',

478

early_stopping_rounds=20,

479

verbose=True,

480

callbacks=[

481

lgb.log_evaluation(10),

482

lgb.record_evaluation({})

483

]

484

)

485

486

# Get feature importance (computed across all workers)

487

importance = regressor.feature_importances_

488

print(f"Top 5 feature importances: {importance[:5]}")

489

490

# Clean up

491

client.close()

492

cluster.close()

493

```

494

495

## Performance Considerations

496

497

### Data Chunking

498

499

- Choose chunk sizes that balance memory usage and computation efficiency

500

- Typical chunk sizes: 10,000-100,000 samples per chunk

501

- Consider network bandwidth when chunks are transferred between workers

502

503

### Worker Configuration

504

505

- Allocate sufficient memory per worker to handle chunk processing

506

- Use multiple threads per worker for CPU-intensive operations

507

- Consider I/O bandwidth when reading distributed data

508

509

### Network Optimization

510

511

- Minimize data movement by co-locating related data chunks

512

- Use compression for network transfers when bandwidth is limited

513

- Consider data locality when scheduling computations