XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible, and portable
XGBoost provides comprehensive distributed computing capabilities for training and prediction across multiple workers and computing environments. This includes integration with Dask, Spark, and collective communication protocols.
Low-level distributed communication primitives for custom distributed setups.
import xgboost.collective as collective
def init(config=None):
"""
Initialize collective communication.
Parameters:
- config: Configuration dictionary or Config object
Returns:
None
"""
def finalize():
"""Finalize collective communication."""
def get_rank():
"""
Get rank of current process.
Returns:
int: Process rank
"""
def get_world_size():
"""
Get total number of processes.
Returns:
int: World size
"""
def is_distributed():
"""
Check if running in distributed mode.
Returns:
bool: True if distributed
"""
def broadcast(data, root):
"""
Broadcast data from root to all processes.
Parameters:
- data: Data to broadcast
- root: Root process rank
Returns:
Broadcasted data
"""
def allreduce(data, op):
"""
All-reduce operation across processes.
Parameters:
- data: Data to reduce
- op: Reduction operation (Op.SUM, Op.MAX, Op.MIN, Op.BITWISE_OR)
Returns:
Reduced data
"""
def communicator_print(message):
"""
Distributed-aware printing function.
Parameters:
- message: Message to print
Returns:
None
"""
def get_processor_name():
"""
Get processor name for current process.
Returns:
str: Processor name
"""
def signal_error(error_message):
"""
Signal error across all processes.
Parameters:
- error_message: Error message to signal
Returns:
None
"""
class Op:
"""Enumeration for collective operation types."""
MAX = "max"
MIN = "min"
SUM = "sum"
BITWISE_OR = "bitwise_or"
class Config:
"""Configuration for collective communication."""
def __init__(
self,
rank=0,
world_size=1,
tracker_uri=None,
user_name=None,
timeout=300,
retry=3
):
"""
Initialize collective config.
Parameters:
- rank: Process rank
- world_size: Total number of processes
- tracker_uri: Tracker URI for coordination
- user_name: User name for identification
- timeout: Communication timeout in seconds
- retry: Number of retries for failed operations
"""
class CommunicatorContext:
"""Context manager for collective operations."""
def __init__(self, **kwargs):
"""
Initialize communicator context.
Parameters:
**kwargs: Configuration parameters
"""
def __enter__(self):
"""Enter context and initialize communication."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context and finalize communication."""Distributed training and prediction using Dask for Python-native scaling.
from xgboost.dask import (
DaskDMatrix,
DaskQuantileDMatrix,
DaskXGBRegressor,
DaskXGBClassifier,
DaskXGBRanker,
train,
predict
)
class DaskDMatrix:
def __init__(
self,
client,
data,
label=None,
weight=None,
base_margin=None,
missing=None,
silent=False,
feature_names=None,
feature_types=None,
group=None,
qid=None,
label_lower_bound=None,
label_upper_bound=None,
feature_weights=None,
enable_categorical=False
):
"""
Dask-compatible DMatrix.
Parameters:
- client: Dask client
- data: Dask array or DataFrame
- (other parameters same as DMatrix)
"""
class DaskXGBRegressor:
def __init__(
self,
n_estimators=100,
max_depth=None,
learning_rate=None,
verbosity=None,
objective=None,
booster=None,
tree_method=None,
n_jobs=None,
**kwargs
):
"""
Dask XGBoost regressor.
Parameters same as XGBRegressor.
"""
def fit(
self,
X,
y,
sample_weight=None,
base_margin=None,
eval_set=None,
sample_weight_eval_set=None,
base_margin_eval_set=None,
eval_metric=None,
early_stopping_rounds=None,
verbose=True,
xgb_model=None,
feature_weights=None,
callbacks=None
):
"""Fit Dask XGBoost model."""
def predict(
self,
X,
output_margin=False,
validate_features=True,
base_margin=None,
iteration_range=None
):
"""Predict using Dask XGBoost model."""
class DaskXGBClassifier:
"""Dask XGBoost classifier with same interface as DaskXGBRegressor."""
def predict_proba(self, X, **kwargs):
"""Predict class probabilities."""
def train(
client,
params,
dtrain,
num_boost_round=10,
evals=None,
obj=None,
maximize=None,
early_stopping_rounds=None,
evals_result=None,
verbose_eval=True,
xgb_model=None,
callbacks=None,
custom_metric=None
):
"""
Distributed training with Dask.
Parameters:
- client: Dask client
- (other parameters same as xgb.train)
Returns:
Trained Booster
"""
def predict(client, model, data, **kwargs):
"""
Distributed prediction with Dask.
Parameters:
- client: Dask client
- model: Trained model (Booster or sklearn estimator)
- data: Input data (DaskDMatrix or Dask array/DataFrame)
Returns:
Predictions as Dask array
"""Integration with Apache Spark for JVM-based distributed computing.
from xgboost.spark import (
SparkXGBRegressor,
SparkXGBClassifier,
SparkXGBRanker,
SparkXGBRegressorModel,
SparkXGBClassifierModel,
SparkXGBRankerModel
)
class SparkXGBRegressor:
def __init__(
self,
max_depth=6,
learning_rate=0.3,
n_estimators=100,
verbosity=1,
objective='reg:squarederror',
booster='gbtree',
tree_method='auto',
n_jobs=1,
gamma=0,
min_child_weight=1,
max_delta_step=0,
subsample=1,
colsample_bytree=1,
colsample_bylevel=1,
colsample_bynode=1,
reg_alpha=0,
reg_lambda=1,
scale_pos_weight=1,
base_score=0.5,
random_state=0,
missing=float('nan'),
num_workers=1,
use_gpu=False,
**kwargs
):
"""
Spark XGBoost regressor.
Parameters:
- num_workers: Number of Spark workers
- use_gpu: Whether to use GPU training
- (other parameters same as XGBRegressor)
"""
def fit(self, dataset):
"""
Fit model on Spark DataFrame.
Parameters:
- dataset: Spark DataFrame with features and labels
Returns:
SparkXGBRegressorModel
"""
class SparkXGBClassifier:
"""Spark XGBoost classifier with same interface as SparkXGBRegressor."""
def fit(self, dataset):
"""Fit classifier on Spark DataFrame."""
class SparkXGBRegressorModel:
def transform(self, dataset):
"""
Transform Spark DataFrame with predictions.
Parameters:
- dataset: Input Spark DataFrame
Returns:
Spark DataFrame with predictions
"""
class SparkXGBClassifierModel:
"""Spark XGBoost classifier model with same interface as regressor model."""
def transform(self, dataset):
"""Transform with class predictions and probabilities."""class RabitTracker:
def __init__(
self,
hostIP=None,
nslave=None,
port=None,
port_end=None,
timeout=None
):
"""
Rabit tracker for distributed training coordination.
Parameters:
- hostIP: Host IP address
- nslave: Number of slave workers
- port: Starting port number
- port_end: Ending port number
- timeout: Connection timeout
"""
def start(self, nslave):
"""Start tracker with specified number of slaves."""
def stop(self):
"""Stop tracker."""
def get_worker_envs(self):
"""Get environment variables for workers."""
@property
def slave_env(self):
"""Environment variables for slave processes."""import dask.array as da
from dask.distributed import Client
from xgboost.dask import DaskXGBRegressor
# Start Dask client
client = Client('scheduler-address:8786')
# Create distributed data
X = da.random.random((10000, 10), chunks=(1000, 10))
y = da.random.random(10000, chunks=1000)
# Train distributed model
model = DaskXGBRegressor(n_estimators=100, max_depth=3)
model.fit(X, y)
# Predict
predictions = model.predict(X)from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBRegressor
# Initialize Spark
spark = SparkSession.builder.appName("XGBoost").getOrCreate()
# Load data as Spark DataFrame
df = spark.read.format("libsvm").load("data.txt")
# Train model
regressor = SparkXGBRegressor(
num_workers=4,
max_depth=3,
n_estimators=100
)
model = regressor.fit(df)
# Make predictions
predictions = model.transform(df)import xgboost.collective as collective
# Initialize collective communication
collective.init()
# Get process information
rank = collective.get_rank()
world_size = collective.get_world_size()
# Broadcast data from rank 0
if rank == 0:
data = [1, 2, 3, 4, 5]
else:
data = None
data = collective.broadcast(data, root=0)
# All-reduce sum across processes
local_sum = sum(range(rank * 10, (rank + 1) * 10))
global_sum = collective.allreduce(local_sum, collective.Op.SUM)
# Finalize
collective.finalize()Install with Tessl CLI
npx tessl i tessl/pypi-xgboost