LightGBM is a gradient boosting framework that uses tree-based learning algorithms, designed to be distributed and efficient with faster training speed, higher efficiency, lower memory usage, better accuracy, and support for parallel, distributed, and GPU learning.
—
Distributed training and prediction using Dask for scalable machine learning across multiple machines. LightGBM's Dask integration provides all the functionality of standard LightGBM models with automatic data distribution and parallel processing capabilities.
Distributed version of LGBMRegressor that can handle large datasets split across multiple Dask workers.
class DaskLGBMRegressor:
"""
Distributed LightGBM regressor using Dask for scalable regression tasks.
All parameters from LGBMRegressor are supported, plus Dask-specific options.
"""
def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,
learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,
objective=None, class_weight=None, min_split_gain=0.,
min_child_weight=1e-3, min_child_samples=20, subsample=1.,
subsample_freq=0, colsample_bytree=1., reg_alpha=0.,
reg_lambda=0., random_state=None, n_jobs=None,
importance_type='split', client=None, **kwargs):
"""
Initialize DaskLGBMRegressor.
Parameters inherit from LGBMRegressor, plus:
- client: dask.distributed.Client or None - Dask client for distributed computing
"""
def fit(self, X, y, sample_weight=None, init_score=None, eval_set=None,
eval_names=None, eval_sample_weight=None, eval_init_score=None,
eval_metric=None, feature_name='auto', categorical_feature='auto',
early_stopping_rounds=None, verbose=True, log_evaluation=None,
callbacks=None, client=None, **kwargs):
"""
Fit distributed regression model.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed training features
- y: dask.array.Array or dask.dataframe.Series - Distributed training targets
- sample_weight: dask.array.Array or None - Distributed sample weights
- init_score: dask.array.Array or None - Distributed initial scores
- eval_set: list of (X, y) tuples with Dask collections - Distributed validation sets
- eval_names: list of strings - Names for evaluation sets
- eval_sample_weight: list of dask arrays - Sample weights for evaluation sets
- eval_init_score: list of dask arrays - Initial scores for evaluation sets
- eval_metric: str, list of str, or None - Evaluation metrics
- feature_name: list of strings or 'auto' - Feature names
- categorical_feature: list of strings/ints or 'auto' - Categorical features
- early_stopping_rounds: int or None - Early stopping rounds
- verbose: bool or int - Controls verbosity
- log_evaluation: bool, int, or None - Evaluation logging frequency
- callbacks: list of callback functions - Custom callbacks
- client: dask.distributed.Client or None - Dask client
Returns:
- self: Returns self
"""
def predict(self, X, num_iteration=None, client=None, **kwargs):
"""
Make distributed predictions.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
- num_iteration: int or None - Limit number of iterations for prediction
- client: dask.distributed.Client or None - Dask client
Returns:
- dask.array.Array: Distributed prediction results
"""
def score(self, X, y, sample_weight=None, client=None):
"""
Return distributed R² coefficient of determination.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Test samples
- y: dask.array.Array or dask.dataframe.Series - True values for X
- sample_weight: dask.array.Array or None - Sample weights
- client: dask.distributed.Client or None - Dask client
Returns:
- float: R² score computed across distributed data
"""Distributed version of LGBMClassifier supporting both binary and multiclass classification across multiple workers.
class DaskLGBMClassifier:
"""
Distributed LightGBM classifier using Dask for scalable classification tasks.
All parameters from LGBMClassifier are supported, plus Dask-specific options.
"""
def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,
learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,
objective=None, class_weight=None, min_split_gain=0.,
min_child_weight=1e-3, min_child_samples=20, subsample=1.,
subsample_freq=0, colsample_bytree=1., reg_alpha=0.,
reg_lambda=0., random_state=None, n_jobs=None,
importance_type='split', client=None, **kwargs):
"""
Initialize DaskLGBMClassifier.
Parameters inherit from LGBMClassifier, plus:
- client: dask.distributed.Client or None - Dask client for distributed computing
"""
def fit(self, X, y, sample_weight=None, init_score=None, eval_set=None,
eval_names=None, eval_sample_weight=None, eval_init_score=None,
eval_metric=None, feature_name='auto', categorical_feature='auto',
early_stopping_rounds=None, verbose=True, log_evaluation=None,
callbacks=None, client=None, **kwargs):
"""
Fit distributed classification model.
Parameters follow same pattern as DaskLGBMRegressor.fit() but for classification data.
Returns:
- self: Returns self
"""
def predict(self, X, num_iteration=None, client=None, **kwargs):
"""
Make distributed class predictions.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
- num_iteration: int or None - Limit number of iterations for prediction
- client: dask.distributed.Client or None - Dask client
Returns:
- dask.array.Array: Distributed class predictions
"""
def predict_proba(self, X, num_iteration=None, client=None, **kwargs):
"""
Make distributed probability predictions.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
- num_iteration: int or None - Limit number of iterations for prediction
- client: dask.distributed.Client or None - Dask client
Returns:
- dask.array.Array: Distributed class probabilities, shape (n_samples, n_classes)
"""
def score(self, X, y, sample_weight=None, client=None):
"""
Return distributed classification accuracy.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Test samples
- y: dask.array.Array or dask.dataframe.Series - True labels for X
- sample_weight: dask.array.Array or None - Sample weights
- client: dask.distributed.Client or None - Dask client
Returns:
- float: Accuracy score computed across distributed data
"""
@property
def classes_(self):
"""Get unique class labels."""
@property
def n_classes_(self):
"""Get number of classes."""Distributed version of LGBMRanker for learning-to-rank tasks on large-scale datasets.
class DaskLGBMRanker:
"""
Distributed LightGBM ranker using Dask for scalable ranking tasks.
All parameters from LGBMRanker are supported, plus Dask-specific options.
"""
def __init__(self, boosting_type='gbdt', num_leaves=31, max_depth=-1,
learning_rate=0.1, n_estimators=100, subsample_for_bin=200000,
objective=None, class_weight=None, min_split_gain=0.,
min_child_weight=1e-3, min_child_samples=20, subsample=1.,
subsample_freq=0, colsample_bytree=1., reg_alpha=0.,
reg_lambda=0., random_state=None, n_jobs=None,
importance_type='split', client=None, **kwargs):
"""
Initialize DaskLGBMRanker.
Parameters inherit from LGBMRanker, plus:
- client: dask.distributed.Client or None - Dask client for distributed computing
"""
def fit(self, X, y, group=None, sample_weight=None, init_score=None,
eval_set=None, eval_names=None, eval_sample_weight=None,
eval_init_score=None, eval_group=None, eval_metric=None,
feature_name='auto', categorical_feature='auto',
early_stopping_rounds=None, verbose=True, log_evaluation=None,
callbacks=None, client=None, **kwargs):
"""
Fit distributed ranking model.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed training features
- y: dask.array.Array or dask.dataframe.Series - Distributed ranking scores
- group: dask.array.Array - Distributed group/query sizes for ranking
- sample_weight: dask.array.Array or None - Distributed sample weights
- init_score: dask.array.Array or None - Distributed initial scores
- eval_set: list of (X, y) tuples with Dask collections - Distributed validation sets
- eval_names: list of strings - Names for evaluation sets
- eval_sample_weight: list of dask arrays - Sample weights for evaluation sets
- eval_init_score: list of dask arrays - Initial scores for evaluation sets
- eval_group: list of dask arrays - Group sizes for evaluation sets
- eval_metric: str, list of str, or None - Evaluation metrics
- feature_name: list of strings or 'auto' - Feature names
- categorical_feature: list of strings/ints or 'auto' - Categorical features
- early_stopping_rounds: int or None - Early stopping rounds
- verbose: bool or int - Controls verbosity
- log_evaluation: bool, int, or None - Evaluation logging frequency
- callbacks: list of callback functions - Custom callbacks
- client: dask.distributed.Client or None - Dask client
Returns:
- self: Returns self
"""
def predict(self, X, num_iteration=None, client=None, **kwargs):
"""
Make distributed ranking predictions.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Distributed input features
- num_iteration: int or None - Limit number of iterations for prediction
- client: dask.distributed.Client or None - Dask client
Returns:
- dask.array.Array: Distributed ranking scores
"""
def score(self, X, y, sample_weight=None, client=None):
"""
Return distributed ranking evaluation score.
Parameters:
- X: dask.array.Array or dask.dataframe.DataFrame - Test samples
- y: dask.array.Array or dask.dataframe.Series - True ranking scores
- sample_weight: dask.array.Array or None - Sample weights
- client: dask.distributed.Client or None - Dask client
Returns:
- float: Ranking score computed across distributed data
"""import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
import lightgbm as lgb
import numpy as np
# Setup Dask client
client = Client('localhost:8786') # Connect to Dask scheduler
# Create large distributed dataset
n_samples = 1_000_000
n_features = 100
# Generate data in chunks
X = da.random.random((n_samples, n_features), chunks=(10000, n_features))
y = da.random.random(n_samples, chunks=10000)
# Split into train/test
train_size = int(0.8 * n_samples)
X_train = X[:train_size]
X_test = X[train_size:]
y_train = y[:train_size]
y_test = y[train_size:]
# Initialize distributed regressor
regressor = lgb.DaskLGBMRegressor(
objective='regression',
n_estimators=100,
learning_rate=0.1,
num_leaves=31,
client=client
)
# Fit model on distributed data
regressor.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
eval_names=['test'],
early_stopping_rounds=10,
verbose=True
)
# Make distributed predictions
predictions = regressor.predict(X_test)
# Compute and collect results
r2_score = regressor.score(X_test, y_test)
print(f"Distributed R² Score: {r2_score:.4f}")
# Close client
client.close()import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
import lightgbm as lgb
import numpy as np
# Setup Dask client with multiple workers
client = Client('localhost:8786')
# Create distributed classification data
n_samples = 500_000
n_features = 50
n_classes = 3
X = da.random.random((n_samples, n_features), chunks=(5000, n_features))
y = da.random.randint(0, n_classes, size=n_samples, chunks=5000)
# Convert to Dask DataFrame for better handling
df = dd.from_dask_array(X, columns=[f'feature_{i}' for i in range(n_features)])
df['target'] = y
# Split data
train_df = df.iloc[:int(0.8 * n_samples)]
test_df = df.iloc[int(0.8 * n_samples):]
X_train = train_df.drop('target', axis=1)
y_train = train_df['target']
X_test = test_df.drop('target', axis=1)
y_test = test_df['target']
# Initialize distributed classifier
classifier = lgb.DaskLGBMClassifier(
objective='multiclass',
num_class=n_classes,
n_estimators=100,
learning_rate=0.1,
num_leaves=31,
client=client
)
# Fit model
classifier.fit(
X_train, y_train,
eval_set=[(X_test, y_test)],
eval_names=['test'],
eval_metric='multi_logloss',
early_stopping_rounds=10,
verbose=True
)
# Make predictions
class_predictions = classifier.predict(X_test)
class_probabilities = classifier.predict_proba(X_test)
# Evaluate
accuracy = classifier.score(X_test, y_test)
print(f"Distributed Accuracy: {accuracy:.4f}")
print(f"Number of classes: {classifier.n_classes_}")
print(f"Class labels: {classifier.classes_}")
client.close()import dask.array as da
from dask.distributed import Client
import lightgbm as lgb
import numpy as np
# Setup Dask client
client = Client('localhost:8786')
# Create distributed ranking data
n_samples = 100_000
n_features = 20
n_queries = 1000
X = da.random.random((n_samples, n_features), chunks=(1000, n_features))
y = da.random.random(n_samples, chunks=1000) # Relevance scores
# Create group sizes for ranking (distributed)
# Each group represents a query with varying number of documents
query_sizes = da.random.randint(50, 150, size=n_queries, chunks=100)
# Ensure total doesn't exceed n_samples
query_sizes = query_sizes[query_sizes.cumsum() <= n_samples]
# Initialize distributed ranker
ranker = lgb.DaskLGBMRanker(
objective='rank_xendcg',
n_estimators=100,
learning_rate=0.1,
num_leaves=31,
client=client
)
# Fit ranking model
ranker.fit(X, y, group=query_sizes)
# Make ranking predictions
ranking_scores = ranker.predict(X)
print(f"Ranking scores computed for {n_samples} samples")
print(f"Sample ranking scores: {ranking_scores[:10].compute()}")
client.close()from dask.distributed import Client, LocalCluster
import lightgbm as lgb
import dask.array as da
# Create local cluster with specific configuration
cluster = LocalCluster(
n_workers=4,
threads_per_worker=2,
memory_limit='4GB',
dashboard_address=':8787'
)
client = Client(cluster)
# Create large dataset that wouldn't fit in memory
X = da.random.random((10_000_000, 100), chunks=(50_000, 100))
y = da.random.random(10_000_000, chunks=50_000)
# Use distributed regressor with advanced parameters
regressor = lgb.DaskLGBMRegressor(
objective='regression',
n_estimators=200,
learning_rate=0.05,
num_leaves=63,
max_depth=7,
min_child_samples=100,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=0.1,
n_jobs=-1, # Use all available cores on each worker
client=client
)
# Fit with validation and callbacks
regressor.fit(
X, y,
eval_set=[(X[-100000:], y[-100000:])], # Use last chunk for validation
eval_names=['validation'],
eval_metric='rmse',
early_stopping_rounds=20,
verbose=True,
callbacks=[
lgb.log_evaluation(10),
lgb.record_evaluation({})
]
)
# Get feature importance (computed across all workers)
importance = regressor.feature_importances_
print(f"Top 5 feature importances: {importance[:5]}")
# Clean up
client.close()
cluster.close()Install with Tessl CLI
npx tessl i tessl/pypi-lightgbm