XGBoost Python Package (CPU only) - A minimal installation with no support for GPU algorithms or federated learning, providing optimized distributed gradient boosting for machine learning
Utility functions for model interpretation, configuration management, visualization, and distributed communication. These tools help understand model behavior, manage XGBoost settings, create visual insights, and coordinate distributed training.
Comprehensive visualization tools for understanding model behavior, feature importance, and decision tree structure. These functions integrate with matplotlib and graphviz for publication-quality plots.
def plot_importance(booster, ax=None, height=0.2, xlim=None, ylim=None,
title='Feature importance', xlabel='F score',
ylabel='Features', fmap='', importance_type='weight',
max_num_features=None, grid=True, show_values=True,
values_format='{v}', **kwargs):
"""
Plot feature importance based on fitted trees.
Parameters:
- booster: Trained XGBoost model (Booster)
- ax: Matplotlib axes object (matplotlib.axes.Axes, optional)
- height: Bar height for horizontal bar plot (float)
- xlim: X-axis limits as (xmin, xmax) (tuple, optional)
- ylim: Y-axis limits as (ymin, ymax) (tuple, optional)
- title: Plot title (str)
- xlabel: X-axis label (str)
- ylabel: Y-axis label (str)
- fmap: Feature map file path (str)
- importance_type: Type of importance to plot (str)
Options: 'weight', 'gain', 'cover', 'total_gain', 'total_cover'
- max_num_features: Maximum number of top features to display (int, optional)
- grid: Whether to show grid lines (bool)
- show_values: Whether to show importance values on bars (bool)
- values_format: Format string for importance values (str)
- **kwargs: Additional arguments passed to matplotlib.pyplot.barh
Returns: matplotlib.axes.Axes - The plot axes object
"""
def plot_tree(booster, fmap='', num_trees=0, rankdir=None, ax=None,
tree_idx=0, show_info=None, precision=None, **kwargs):
"""
Plot specified tree using matplotlib.
Parameters:
- booster: Trained XGBoost model (Booster)
- fmap: Feature map file path (str)
- num_trees: Tree index to plot (deprecated, use tree_idx) (int)
- rankdir: Direction of plot layout ('UT', 'LR', 'BT', 'RL') (str, optional)
- ax: Matplotlib axes object (matplotlib.axes.Axes, optional)
- tree_idx: Index of tree to plot (int)
- show_info: Information to show in nodes (list of str, optional)
Options include: 'split', 'gain', 'cover', 'weight'
- precision: Number of decimal places for floating point values (int, optional)
- **kwargs: Additional arguments for graphviz layout
Returns: matplotlib.axes.Axes - The plot axes object
"""
def to_graphviz(booster, fmap='', num_trees=0, rankdir=None,
yes_color=None, no_color=None, condition_node_params=None,
leaf_node_params=None, **kwargs):
"""
Convert specified tree to graphviz instance for advanced visualization.
Parameters:
- booster: Trained XGBoost model (Booster)
- fmap: Feature map file path (str)
- num_trees: Tree index (deprecated, use tree_idx in kwargs) (int)
- rankdir: Direction of tree layout ('UT', 'LR', 'BT', 'RL') (str, optional)
- yes_color: Color for 'yes' edges (str, optional)
- no_color: Color for 'no' edges (str, optional)
- condition_node_params: Parameters for condition nodes (dict, optional)
- leaf_node_params: Parameters for leaf nodes (dict, optional)
- **kwargs: Additional parameters including:
- tree_idx: Index of tree to visualize (int)
- with_stats: Whether to include node statistics (bool)
Returns: graphviz.Source - Graphviz source object for rendering
"""Global configuration management for XGBoost behavior, including device selection, verbosity levels, and algorithm parameters that affect all XGBoost operations.
def set_config(**new_config):
"""
Set global XGBoost configuration parameters.
Parameters:
- **new_config: Configuration parameters as keyword arguments
Common parameters:
- verbosity: Global verbosity level (int, 0-3)
0=silent, 1=warning, 2=info, 3=debug
- use_rmm: Whether to use RMM memory allocator (bool)
- nthread: Global number of threads (int)
- device: Global device setting ('cpu', 'cuda', 'gpu') (str)
Example configurations:
set_config(verbosity=2, device='cpu', nthread=4)
set_config(use_rmm=True) # For GPU memory management
"""
def get_config():
"""
Get current global XGBoost configuration values.
Returns: dict - Dictionary containing all current configuration parameters
Keys include: 'verbosity', 'use_rmm', 'nthread', 'device', etc.
"""
def config_context(**new_config):
"""
Context manager for temporary XGBoost configuration changes.
Parameters:
- **new_config: Temporary configuration parameters
Returns: Context manager that restores previous config on exit
Usage:
with config_context(verbosity=0, device='cpu'):
# XGBoost operations with temporary config
model = xgb.train(params, dtrain, num_boost_round=100)
# Previous configuration restored automatically
"""Low-level distributed communication primitives for custom distributed training setups. These functions enable coordination between multiple workers in distributed environments.
import xgboost.collective as collective
def collective.init(config):
"""
Initialize collective communication library.
Parameters:
- config: Configuration for collective communication (collective.Config)
"""
def collective.finalize():
"""Finalize collective communication and clean up resources."""
def collective.get_rank():
"""
Get rank (ID) of current process in distributed setup.
Returns: int - Process rank (0-based indexing)
"""
def collective.get_world_size():
"""
Get total number of workers in distributed setup.
Returns: int - Total number of processes
"""
def collective.is_distributed():
"""
Check if running in distributed mode.
Returns: bool - True if distributed, False if single process
"""
def collective.communicator_print(msg):
"""
Print message from the communicator with rank information.
Parameters:
- msg: Message to print (str)
"""
def collective.get_processor_name():
"""
Get name of the processor/node.
Returns: str - Processor/hostname identifier
"""
def collective.broadcast(data, root):
"""
Broadcast object from root process to all other processes.
Parameters:
- data: Data to broadcast (any serializable object)
- root: Rank of root process (int)
Returns: object - Broadcasted data (same on all processes)
"""
def collective.allreduce(data, op):
"""
Perform allreduce operation across all processes.
Parameters:
- data: Data to reduce (numeric array-like)
- op: Reduction operation (collective.Op)
Options: Op.MAX, Op.MIN, Op.SUM, Op.BITWISE_AND, Op.BITWISE_OR, Op.BITWISE_XOR
Returns: object - Reduced result (same on all processes)
"""
def collective.signal_error(msg):
"""
Signal error condition to terminate all processes.
Parameters:
- msg: Error message (str)
"""
class collective.Config:
def __init__(self, *, retry=3, timeout=300, tracker_host_ip=None,
tracker_port=0, tracker_timeout=30):
"""
Configuration for collective communication.
Parameters:
- retry: Number of connection retries (int)
- timeout: Communication timeout in seconds (int)
- tracker_host_ip: IP address of tracker (str, optional)
- tracker_port: Port number for tracker (int)
- tracker_timeout: Tracker connection timeout (int)
"""
class collective.CommunicatorContext:
def __init__(self, **kwargs):
"""
Context manager for collective communicator setup and cleanup.
Parameters:
- **kwargs: Arguments passed to collective.init()
"""
class collective.Op:
"""Enumeration of reduction operations for allreduce."""
MAX = 0 # Maximum value
MIN = 1 # Minimum value
SUM = 2 # Sum of values
BITWISE_AND = 3 # Bitwise AND
BITWISE_OR = 4 # Bitwise OR
BITWISE_XOR = 5 # Bitwise XORHigh-level coordination utilities for distributed training setups, including worker synchronization and fault tolerance.
from xgboost.tracker import RabitTracker
class RabitTracker:
def __init__(self, n_workers, host_ip=None, port=0, *, sortby='process',
timeout=3600):
"""
Tracker for collective communication coordination between workers.
Parameters:
- n_workers: Number of worker processes (int)
- host_ip: Host IP address for tracker (str, optional)
If None, uses local machine IP
- port: Port number for tracker (int, 0 for auto-assignment)
- sortby: Method for sorting workers ('process' or 'ip') (str)
- timeout: Maximum time to wait for workers (int, seconds)
"""
def start(self):
"""
Start the tracker server.
Returns: dict - Connection information including:
- 'host_ip': Tracker IP address
- 'port': Tracker port number
"""
def wait_for(self, timeout=None):
"""
Wait for all workers to connect and complete training.
Parameters:
- timeout: Maximum wait time in seconds (int, optional)
If None, uses the timeout from __init__
Returns: bool - True if all workers completed successfully
"""
def worker_args(self):
"""
Get environment arguments for worker processes.
Returns: dict - Environment variables for workers including:
- 'DMLC_TRACKER_URI': Tracker URI
- 'DMLC_TRACKER_PORT': Tracker port
- 'DMLC_TASK_ID': Task ID (set per worker)
"""
def build_info():
"""
Get build information for XGBoost installation.
Returns: dict - Build configuration including:
- 'USE_CUDA': Whether CUDA support is compiled
- 'USE_NCCL': Whether NCCL support is available
- 'COMPILER': Compiler used for building
- 'BUILD_WITH_SHARED_PTR': Shared pointer support
- And other compilation flags
"""import xgboost as xgb
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
# Create and train model
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10,
random_state=42)
feature_names = [f'feature_{i}' for i in range(20)]
dtrain = xgb.DMatrix(X, label=y, feature_names=feature_names)
params = {'objective': 'binary:logistic', 'max_depth': 6, 'learning_rate': 0.1}
model = xgb.train(params, dtrain, num_boost_round=100)
# Plot feature importance with different metrics
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# Weight-based importance (frequency of splits)
xgb.plot_importance(model, ax=axes[0,0], importance_type='weight',
max_num_features=10, title='Importance by Weight')
# Gain-based importance (average gain of splits)
xgb.plot_importance(model, ax=axes[0,1], importance_type='gain',
max_num_features=10, title='Importance by Gain')
# Cover-based importance (average coverage of splits)
xgb.plot_importance(model, ax=axes[1,0], importance_type='cover',
max_num_features=10, title='Importance by Cover')
# Total gain importance
xgb.plot_importance(model, ax=axes[1,1], importance_type='total_gain',
max_num_features=10, title='Importance by Total Gain')
plt.tight_layout()
plt.show()
# Customized importance plot
plt.figure(figsize=(10, 8))
xgb.plot_importance(model,
height=0.5,
importance_type='gain',
max_num_features=15,
title='Top 15 Features by Information Gain',
xlabel='Information Gain',
grid=True,
show_values=True,
values_format='{v:.3f}',
color='skyblue',
edgecolor='navy')
plt.show()import xgboost as xgb
import matplotlib.pyplot as plt
# Train a simple model for visualization
dtrain = xgb.DMatrix(X[:100], label=y[:100], feature_names=feature_names[:5])
simple_model = xgb.train({'max_depth': 3, 'objective': 'binary:logistic'},
dtrain, num_boost_round=3)
# Plot individual trees
fig, axes = plt.subplots(1, 3, figsize=(20, 6))
for i in range(3):
xgb.plot_tree(simple_model, ax=axes[i], tree_idx=i,
show_info=['split', 'gain'],
precision=2)
axes[i].set_title(f'Tree {i}')
plt.tight_layout()
plt.show()
# Create graphviz visualization for high-quality output
graphviz_tree = xgb.to_graphviz(simple_model, tree_idx=0,
rankdir='TB', # Top to bottom
yes_color='lightblue',
no_color='lightcoral',
condition_node_params={'shape': 'box', 'style': 'filled'},
leaf_node_params={'shape': 'ellipse', 'style': 'filled'})
# Save as PDF or PNG
graphviz_tree.render('xgb_tree_visualization', format='png', cleanup=True)
print("Tree visualization saved as 'xgb_tree_visualization.png'")
# Display in Jupyter notebook
# graphviz_tree.view()import xgboost as xgb
# Check current configuration
current_config = xgb.get_config()
print("Current XGBoost configuration:")
for key, value in current_config.items():
print(f" {key}: {value}")
# Set global configuration
xgb.set_config(verbosity=2, # More verbose output
nthread=4, # Use 4 threads globally
device='cpu') # Force CPU usage
print(f"\nUpdated verbosity: {xgb.get_config()['verbosity']}")
# Use configuration context for temporary changes
print("\nTraining with temporary quiet configuration:")
with xgb.config_context(verbosity=0): # Silent mode
quiet_model = xgb.train({'objective': 'binary:logistic'}, dtrain,
num_boost_round=10)
print("Model trained silently")
print("Back to previous verbosity level")
# Configuration for GPU training (if available)
try:
with xgb.config_context(device='cuda'):
gpu_params = {'objective': 'binary:logistic', 'tree_method': 'gpu_hist'}
gpu_model = xgb.train(gpu_params, dtrain, num_boost_round=10)
print("GPU training completed")
except Exception as e:
print(f"GPU training not available: {e}")
# Reset to default configuration
xgb.set_config(verbosity=1, device='cpu')import xgboost as xgb
from xgboost import collective
import numpy as np
# Example of basic collective operations (typically run across multiple processes)
def distributed_example():
"""Example showing collective communication primitives."""
# Initialize collective communication
config = collective.Config(timeout=300, retry=3)
with collective.CommunicatorContext(config=config):
rank = collective.get_rank()
world_size = collective.get_world_size()
print(f"Process {rank} of {world_size}")
print(f"Running on: {collective.get_processor_name()}")
# Example data for each process
local_data = np.array([rank + 1, rank * 2])
# Broadcast data from rank 0 to all processes
if rank == 0:
broadcast_data = {'model_params': {'max_depth': 6, 'learning_rate': 0.1}}
else:
broadcast_data = None
shared_params = collective.broadcast(broadcast_data, root=0)
print(f"Rank {rank} received: {shared_params}")
# Sum all local data across processes
global_sum = collective.allreduce(local_data, collective.Op.SUM)
print(f"Rank {rank} global sum: {global_sum}")
# Find maximum across all processes
global_max = collective.allreduce(local_data, collective.Op.MAX)
print(f"Rank {rank} global max: {global_max}")
# Note: This would typically be run in a multi-process environment
# distributed_example()import xgboost as xgb
from xgboost.tracker import RabitTracker
import multiprocessing as mp
import os
def worker_process(worker_id, tracker_args, data_partition):
"""Worker process for distributed training."""
# Set up environment for this worker
os.environ.update(tracker_args)
os.environ['DMLC_TASK_ID'] = str(worker_id)
# Initialize collective communication
collective_config = collective.Config()
collective.init(collective_config)
try:
# Create local DMatrix from data partition
X_local, y_local = data_partition
dtrain_local = xgb.DMatrix(X_local, label=y_local)
# Training parameters
params = {
'objective': 'binary:logistic',
'max_depth': 6,
'learning_rate': 0.1,
'tree_method': 'hist'
}
# Distributed training
model = xgb.train(params, dtrain_local, num_boost_round=50)
print(f"Worker {worker_id} completed training")
return model
finally:
collective.finalize()
def distributed_training_example():
"""Example of distributed training setup with RabitTracker."""
# Create sample data and split into partitions
X, y = make_classification(n_samples=10000, n_features=20,
n_classes=2, random_state=42)
n_workers = 4
partition_size = len(X) // n_workers
data_partitions = []
for i in range(n_workers):
start_idx = i * partition_size
end_idx = (i + 1) * partition_size if i < n_workers - 1 else len(X)
partition = (X[start_idx:end_idx], y[start_idx:end_idx])
data_partitions.append(partition)
# Initialize tracker
tracker = RabitTracker(n_workers=n_workers, timeout=300)
# Start tracker
tracker_info = tracker.start()
print(f"Tracker started at {tracker_info['host_ip']}:{tracker_info['port']}")
# Get worker arguments
worker_args = tracker.worker_args()
# Start worker processes
processes = []
for worker_id in range(n_workers):
p = mp.Process(target=worker_process,
args=(worker_id, worker_args, data_partitions[worker_id]))
p.start()
processes.append(p)
# Wait for all workers to complete
success = tracker.wait_for(timeout=600)
# Clean up processes
for p in processes:
p.join()
if success:
print("Distributed training completed successfully!")
else:
print("Distributed training failed or timed out")
# Note: Run this in a script, not in interactive environment
# distributed_training_example()import xgboost as xgb
# Get comprehensive build information
build_info = xgb.build_info()
print("XGBoost Build Information:")
print("=" * 50)
# Check for key capabilities
gpu_support = build_info.get('USE_CUDA', False)
nccl_support = build_info.get('USE_NCCL', False)
omp_support = build_info.get('USE_OPENMP', False)
print(f"GPU Support (CUDA): {gpu_support}")
print(f"Multi-GPU Support (NCCL): {nccl_support}")
print(f"OpenMP Support: {omp_support}")
# Compiler and build details
print(f"\nCompiler: {build_info.get('COMPILER', 'Unknown')}")
print(f"Build with shared pointers: {build_info.get('BUILD_WITH_SHARED_PTR', False)}")
# Print all build flags
print(f"\nAll build configuration:")
for key, value in sorted(build_info.items()):
print(f" {key}: {value}")
# Version information
print(f"\nXGBoost version: {xgb.__version__}")
# Device availability check
def check_device_availability():
"""Check what devices are available for XGBoost."""
devices = []
# CPU is always available
devices.append('cpu')
# Check GPU availability
if build_info.get('USE_CUDA', False):
try:
# Try to set CUDA device to test availability
with xgb.config_context(device='cuda'):
devices.append('cuda')
except Exception:
pass
return devices
available_devices = check_device_availability()
print(f"\nAvailable devices: {available_devices}")
# Memory and performance recommendations
def get_performance_recommendations():
"""Get performance recommendations based on build configuration."""
recommendations = []
if not build_info.get('USE_CUDA', False):
recommendations.append("Consider GPU version for large datasets")
if not build_info.get('USE_OPENMP', False):
recommendations.append("OpenMP not available - limited CPU parallelization")
if build_info.get('USE_NCCL', False):
recommendations.append("NCCL available - good for multi-GPU training")
return recommendations
recommendations = get_performance_recommendations()
if recommendations:
print(f"\nPerformance recommendations:")
for rec in recommendations:
print(f" - {rec}")import xgboost as xgb
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Rectangle
# Set up styling
plt.style.use('seaborn-v0_8')
colors = sns.color_palette("husl", 10)
def create_comprehensive_model_report(model, feature_names=None):
"""Create a comprehensive visual report for XGBoost model."""
fig = plt.figure(figsize=(20, 16))
# Feature importance by different metrics
importance_types = ['weight', 'gain', 'cover', 'total_gain']
for i, imp_type in enumerate(importance_types):
ax = plt.subplot(3, 4, i + 1)
xgb.plot_importance(model, ax=ax, importance_type=imp_type,
max_num_features=10, color=colors[i],
title=f'Importance by {imp_type.title()}')
# Individual trees (first 4 trees)
for i in range(4):
if i < model.num_boosted_rounds():
ax = plt.subplot(3, 4, i + 5)
xgb.plot_tree(model, ax=ax, tree_idx=i, precision=2)
ax.set_title(f'Tree {i}')
# Model performance metrics (if available)
ax = plt.subplot(3, 2, 5)
# Get feature scores for analysis
feature_scores = model.get_score(importance_type='gain')
if feature_scores:
top_features = sorted(feature_scores.items(),
key=lambda x: x[1], reverse=True)[:10]
features, scores = zip(*top_features)
bars = ax.barh(range(len(features)), scores, color=colors[:len(features)])
ax.set_yticks(range(len(features)))
ax.set_yticklabels(features)
ax.set_xlabel('Feature Gain')
ax.set_title('Top 10 Features by Gain')
# Add value labels on bars
for i, (bar, score) in enumerate(zip(bars, scores)):
ax.text(bar.get_width() + max(scores) * 0.01, bar.get_y() + bar.get_height()/2,
f'{score:.3f}', ha='left', va='center', fontsize=8)
# Model info panel
ax = plt.subplot(3, 2, 6)
ax.axis('off')
# Create info text
info_text = f"""
Model Information:
Number of trees: {model.num_boosted_rounds()}
Number of features: {model.num_features()}
Best iteration: {getattr(model, 'best_iteration', 'N/A')}
Best score: {getattr(model, 'best_score', 'N/A')}
Top 3 Features:
"""
if feature_scores:
for i, (feature, score) in enumerate(top_features[:3]):
info_text += f"\n {i+1}. {feature}: {score:.3f}"
ax.text(0.1, 0.9, info_text, transform=ax.transAxes, fontsize=12,
verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
plt.suptitle('XGBoost Model Analysis Report', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()
# Generate comprehensive report
create_comprehensive_model_report(model, feature_names)Install with Tessl CLI
npx tessl i tessl/pypi-xgboost-cpu