CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-xgboost

XGBoost is an optimized distributed gradient boosting library designed to be highly efficient, flexible, and portable

Overview
Eval results
Files

distributed-computing.mddocs/

Distributed Computing

XGBoost provides comprehensive distributed computing capabilities for training and prediction across multiple workers and computing environments. This includes integration with Dask, Spark, and collective communication protocols.

Capabilities

Collective Communication

Low-level distributed communication primitives for custom distributed setups.

import xgboost.collective as collective

def init(config=None):
    """
    Initialize collective communication.

    Parameters:
    - config: Configuration dictionary or Config object

    Returns:
    None
    """

def finalize():
    """Finalize collective communication."""

def get_rank():
    """
    Get rank of current process.

    Returns:
    int: Process rank
    """

def get_world_size():
    """
    Get total number of processes.

    Returns:
    int: World size
    """

def is_distributed():
    """
    Check if running in distributed mode.

    Returns:
    bool: True if distributed
    """

def broadcast(data, root):
    """
    Broadcast data from root to all processes.

    Parameters:
    - data: Data to broadcast
    - root: Root process rank

    Returns:
    Broadcasted data
    """

def allreduce(data, op):
    """
    All-reduce operation across processes.

    Parameters:
    - data: Data to reduce
    - op: Reduction operation (Op.SUM, Op.MAX, Op.MIN, Op.BITWISE_OR)

    Returns:
    Reduced data
    """

def communicator_print(message):
    """
    Distributed-aware printing function.

    Parameters:
    - message: Message to print

    Returns:
    None
    """

def get_processor_name():
    """
    Get processor name for current process.

    Returns:
    str: Processor name
    """

def signal_error(error_message):
    """
    Signal error across all processes.

    Parameters:
    - error_message: Error message to signal

    Returns:
    None
    """

class Op:
    """Enumeration for collective operation types."""
    MAX = "max"
    MIN = "min" 
    SUM = "sum"
    BITWISE_OR = "bitwise_or"

class Config:
    """Configuration for collective communication."""
    def __init__(
        self,
        rank=0,
        world_size=1, 
        tracker_uri=None,
        user_name=None,
        timeout=300,
        retry=3
    ):
        """
        Initialize collective config.

        Parameters:
        - rank: Process rank
        - world_size: Total number of processes
        - tracker_uri: Tracker URI for coordination
        - user_name: User name for identification
        - timeout: Communication timeout in seconds
        - retry: Number of retries for failed operations
        """

class CommunicatorContext:
    """Context manager for collective operations."""
    def __init__(self, **kwargs):
        """
        Initialize communicator context.

        Parameters:
        **kwargs: Configuration parameters
        """

    def __enter__(self):
        """Enter context and initialize communication."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context and finalize communication."""

Dask Integration

Distributed training and prediction using Dask for Python-native scaling.

from xgboost.dask import (
    DaskDMatrix,
    DaskQuantileDMatrix,
    DaskXGBRegressor,
    DaskXGBClassifier,
    DaskXGBRanker,
    train,
    predict
)

class DaskDMatrix:
    def __init__(
        self,
        client,
        data,
        label=None,
        weight=None,
        base_margin=None,
        missing=None,
        silent=False,
        feature_names=None,
        feature_types=None,
        group=None,
        qid=None,
        label_lower_bound=None,
        label_upper_bound=None,
        feature_weights=None,
        enable_categorical=False
    ):
        """
        Dask-compatible DMatrix.

        Parameters:
        - client: Dask client
        - data: Dask array or DataFrame
        - (other parameters same as DMatrix)
        """

class DaskXGBRegressor:
    def __init__(
        self,
        n_estimators=100,
        max_depth=None,
        learning_rate=None,
        verbosity=None,
        objective=None,
        booster=None,
        tree_method=None,
        n_jobs=None,
        **kwargs
    ):
        """
        Dask XGBoost regressor.

        Parameters same as XGBRegressor.
        """

    def fit(
        self,
        X,
        y,
        sample_weight=None,
        base_margin=None,
        eval_set=None,
        sample_weight_eval_set=None,
        base_margin_eval_set=None,
        eval_metric=None,
        early_stopping_rounds=None,
        verbose=True,
        xgb_model=None,
        feature_weights=None,
        callbacks=None
    ):
        """Fit Dask XGBoost model."""

    def predict(
        self,
        X,
        output_margin=False,
        validate_features=True,
        base_margin=None,
        iteration_range=None
    ):
        """Predict using Dask XGBoost model."""

class DaskXGBClassifier:
    """Dask XGBoost classifier with same interface as DaskXGBRegressor."""

    def predict_proba(self, X, **kwargs):
        """Predict class probabilities."""

def train(
    client,
    params,
    dtrain,
    num_boost_round=10,
    evals=None,
    obj=None,
    maximize=None,
    early_stopping_rounds=None,
    evals_result=None,
    verbose_eval=True,
    xgb_model=None,
    callbacks=None,
    custom_metric=None
):
    """
    Distributed training with Dask.

    Parameters:
    - client: Dask client
    - (other parameters same as xgb.train)

    Returns:
    Trained Booster
    """

def predict(client, model, data, **kwargs):
    """
    Distributed prediction with Dask.

    Parameters:
    - client: Dask client
    - model: Trained model (Booster or sklearn estimator)
    - data: Input data (DaskDMatrix or Dask array/DataFrame)

    Returns:
    Predictions as Dask array
    """

Spark Integration

Integration with Apache Spark for JVM-based distributed computing.

from xgboost.spark import (
    SparkXGBRegressor,
    SparkXGBClassifier,
    SparkXGBRanker,
    SparkXGBRegressorModel,
    SparkXGBClassifierModel,
    SparkXGBRankerModel
)

class SparkXGBRegressor:
    def __init__(
        self,
        max_depth=6,
        learning_rate=0.3,
        n_estimators=100,
        verbosity=1,
        objective='reg:squarederror',
        booster='gbtree',
        tree_method='auto',
        n_jobs=1,
        gamma=0,
        min_child_weight=1,
        max_delta_step=0,
        subsample=1,
        colsample_bytree=1,
        colsample_bylevel=1,
        colsample_bynode=1,
        reg_alpha=0,
        reg_lambda=1,
        scale_pos_weight=1,
        base_score=0.5,
        random_state=0,
        missing=float('nan'),
        num_workers=1,
        use_gpu=False,
        **kwargs
    ):
        """
        Spark XGBoost regressor.

        Parameters:
        - num_workers: Number of Spark workers
        - use_gpu: Whether to use GPU training
        - (other parameters same as XGBRegressor)
        """

    def fit(self, dataset):
        """
        Fit model on Spark DataFrame.

        Parameters:
        - dataset: Spark DataFrame with features and labels

        Returns:
        SparkXGBRegressorModel
        """

class SparkXGBClassifier:
    """Spark XGBoost classifier with same interface as SparkXGBRegressor."""

    def fit(self, dataset):
        """Fit classifier on Spark DataFrame."""

class SparkXGBRegressorModel:
    def transform(self, dataset):
        """
        Transform Spark DataFrame with predictions.

        Parameters:
        - dataset: Input Spark DataFrame

        Returns:
        Spark DataFrame with predictions
        """

class SparkXGBClassifierModel:
    """Spark XGBoost classifier model with same interface as regressor model."""

    def transform(self, dataset):
        """Transform with class predictions and probabilities."""

Distributed Training Utilities

class RabitTracker:
    def __init__(
        self,
        hostIP=None,
        nslave=None,
        port=None,
        port_end=None,
        timeout=None
    ):
        """
        Rabit tracker for distributed training coordination.

        Parameters:
        - hostIP: Host IP address
        - nslave: Number of slave workers
        - port: Starting port number
        - port_end: Ending port number
        - timeout: Connection timeout
        """

    def start(self, nslave):
        """Start tracker with specified number of slaves."""

    def stop(self):
        """Stop tracker."""

    def get_worker_envs(self):
        """Get environment variables for workers."""

    @property
    def slave_env(self):
        """Environment variables for slave processes."""

Usage Examples

Dask Distributed Training

import dask.array as da
from dask.distributed import Client
from xgboost.dask import DaskXGBRegressor

# Start Dask client
client = Client('scheduler-address:8786')

# Create distributed data
X = da.random.random((10000, 10), chunks=(1000, 10))
y = da.random.random(10000, chunks=1000)

# Train distributed model
model = DaskXGBRegressor(n_estimators=100, max_depth=3)
model.fit(X, y)

# Predict
predictions = model.predict(X)

Spark Training

from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBRegressor

# Initialize Spark
spark = SparkSession.builder.appName("XGBoost").getOrCreate()

# Load data as Spark DataFrame
df = spark.read.format("libsvm").load("data.txt")

# Train model
regressor = SparkXGBRegressor(
    num_workers=4,
    max_depth=3,
    n_estimators=100
)

model = regressor.fit(df)

# Make predictions
predictions = model.transform(df)

Collective Communication

import xgboost.collective as collective

# Initialize collective communication
collective.init()

# Get process information
rank = collective.get_rank()
world_size = collective.get_world_size()

# Broadcast data from rank 0
if rank == 0:
    data = [1, 2, 3, 4, 5]
else:
    data = None

data = collective.broadcast(data, root=0)

# All-reduce sum across processes
local_sum = sum(range(rank * 10, (rank + 1) * 10))
global_sum = collective.allreduce(local_sum, collective.Op.SUM)

# Finalize
collective.finalize()

Install with Tessl CLI

npx tessl i tessl/pypi-xgboost

docs

callbacks.md

configuration.md

core-api.md

distributed-computing.md

index.md

sklearn-interface.md

visualization.md

tile.json