CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-xorbits

Scalable Python data science, in an API compatible & lightning fast way.

Overview
Eval results
Files

remote-computing.mddocs/

Remote Computing

Remote function execution capabilities for distributed computing workloads. Xorbits remote module enables spawning and executing functions across distributed workers for flexible parallel computing patterns.

Capabilities

Remote Function Execution

Spawn and execute functions remotely across distributed Xorbits workers.

def spawn(
    func,
    args=(),
    kwargs=None,
    retry_when_fail=False,
    n_output=None,
    output_types=None,
    **kw
):
    """
    Spawn remote function execution across distributed workers.
    
    Executes functions on remote workers in the Xorbits cluster,
    enabling flexible distributed computing patterns beyond
    standard array and DataFrame operations.
    
    Parameters:
    - func: callable, function to execute remotely
    - args: tuple, positional arguments to pass to function
    - kwargs: dict, keyword arguments to pass to function  
    - retry_when_fail: bool, whether to retry when the task fails
    - n_output: int, number of outputs expected from the function
    - output_types: list, types of the outputs
    - **kw: Additional keyword arguments
    
    Returns:
    - Remote execution result that can be retrieved with xorbits.run()
    """

Usage Examples:

Basic Remote Function Execution

import xorbits
from xorbits.remote import spawn
import time

xorbits.init()

# Define functions to execute remotely
def compute_heavy_task(n):
    """Simulate computationally heavy task."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def process_data_chunk(data_chunk, multiplier=2):
    """Process a chunk of data."""
    return [x * multiplier for x in data_chunk]

# Spawn remote function execution
task1 = spawn(compute_heavy_task, args=(1000000,))
task2 = spawn(compute_heavy_task, args=(2000000,))

# Execute multiple tasks in parallel
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
processing_tasks = [
    spawn(process_data_chunk, args=(chunk,), kwargs={'multiplier': 3})
    for chunk in data_chunks
]

# Retrieve results
heavy_results = xorbits.run(task1, task2)
processing_results = xorbits.run(*processing_tasks)

print(f"Heavy computation results: {heavy_results}")
print(f"Data processing results: {processing_results}")

xorbits.shutdown()

Advanced Remote Computing Patterns

import xorbits
from xorbits.remote import spawn
import xorbits.pandas as pd
import xorbits.numpy as np

xorbits.init()

# Define custom distributed algorithms
def monte_carlo_pi(num_samples):
    """Monte Carlo estimation of Pi."""
    import random
    inside_circle = 0
    for _ in range(num_samples):
        x, y = random.random(), random.random()
        if x*x + y*y <= 1:
            inside_circle += 1
    return 4 * inside_circle / num_samples

def custom_aggregation(data_partition):
    """Custom aggregation function for distributed data."""
    import numpy as np
    return {
        'sum': np.sum(data_partition),
        'mean': np.mean(data_partition),
        'std': np.std(data_partition),
        'count': len(data_partition)
    }

# Distributed Monte Carlo computation
num_workers = 4
samples_per_worker = 1000000

pi_tasks = [
    spawn(monte_carlo_pi, args=(samples_per_worker,))
    for _ in range(num_workers)
]

# Custom distributed data processing
large_array = np.random.random(10000000)
chunk_size = len(large_array) // num_workers
data_chunks = [
    large_array[i*chunk_size:(i+1)*chunk_size] 
    for i in range(num_workers)
]

aggregation_tasks = [
    spawn(custom_aggregation, args=(chunk,))
    for chunk in data_chunks
]

# Execute distributed computations
pi_estimates = xorbits.run(*pi_tasks)
aggregation_results = xorbits.run(*aggregation_tasks)

# Combine results
final_pi_estimate = sum(pi_estimates) / len(pi_estimates)
print(f"Distributed Pi estimate: {final_pi_estimate}")

# Combine aggregation results
total_sum = sum(result['sum'] for result in aggregation_results)
total_count = sum(result['count'] for result in aggregation_results)
global_mean = total_sum / total_count
print(f"Global mean: {global_mean}")

xorbits.shutdown()

Remote Function with Resources

import xorbits
from xorbits.remote import spawn

xorbits.init()

def gpu_computation(matrix_size):
    """Computation that requires GPU resources."""
    try:
        import cupy as cp  # GPU library
        matrix = cp.random.random((matrix_size, matrix_size))
        result = cp.linalg.inv(matrix)
        return cp.asnumpy(result.diagonal().sum())
    except ImportError:
        # Fallback to CPU computation
        import numpy as np
        matrix = np.random.random((matrix_size, matrix_size))
        result = np.linalg.inv(matrix)
        return result.diagonal().sum()

def memory_intensive_task(data_size):
    """Task requiring specific memory resources."""
    import numpy as np
    large_array = np.random.random(data_size)
    return np.std(large_array)

# Spawn tasks with resource requirements
gpu_task = spawn(
    gpu_computation,
    args=(1000,),
    resources={'gpu': 1}  # Request GPU resource
)

memory_task = spawn(
    memory_intensive_task,
    args=(10000000,),
    resources={'memory': 2 * 1024 * 1024 * 1024}  # Request 2GB memory
)

# Execute with resource constraints
results = xorbits.run(gpu_task, memory_task)
print(f"GPU computation result: {results[0]}")
print(f"Memory intensive result: {results[1]}")

xorbits.shutdown()

Distributed Data Pipeline with Remote Functions

import xorbits
from xorbits.remote import spawn
import xorbits.pandas as pd

xorbits.init()

def extract_features(data_chunk):
    """Extract features from data chunk."""
    features = {}
    features['mean'] = data_chunk.mean()
    features['std'] = data_chunk.std()
    features['min'] = data_chunk.min()
    features['max'] = data_chunk.max()
    features['count'] = len(data_chunk)
    return features

def validate_data(data_chunk):
    """Validate data quality."""
    issues = []
    if data_chunk.isnull().any():
        issues.append('contains_nulls')
    if (data_chunk < 0).any():
        issues.append('contains_negatives')
    if data_chunk.std() == 0:
        issues.append('no_variance')
    return {
        'valid': len(issues) == 0,
        'issues': issues,
        'chunk_size': len(data_chunk)
    }

# Load distributed data
large_dataset = pd.read_csv('large_dataset.csv')

# Split into chunks for parallel processing
num_chunks = 8
chunk_size = len(large_dataset) // num_chunks
chunks = [
    large_dataset[i*chunk_size:(i+1)*chunk_size]['value'] 
    for i in range(num_chunks)
]

# Process chunks in parallel using remote functions
feature_tasks = [spawn(extract_features, args=(chunk,)) for chunk in chunks]
validation_tasks = [spawn(validate_data, args=(chunk,)) for chunk in chunks]

# Execute parallel processing
feature_results = xorbits.run(*feature_tasks)
validation_results = xorbits.run(*validation_tasks)

# Aggregate results
total_count = sum(f['count'] for f in feature_results)
global_mean = sum(f['mean'] * f['count'] for f in feature_results) / total_count

validation_summary = {
    'total_chunks': len(validation_results),
    'valid_chunks': sum(1 for v in validation_results if v['valid']),
    'common_issues': {}
}

# Count issue frequency
for result in validation_results:
    for issue in result['issues']:
        validation_summary['common_issues'][issue] = \
            validation_summary['common_issues'].get(issue, 0) + 1

print(f"Global mean: {global_mean}")
print(f"Validation summary: {validation_summary}")

xorbits.shutdown()

Error Handling and Retry Patterns

import xorbits
from xorbits.remote import spawn
import random

xorbits.init()

def unreliable_function(task_id, failure_rate=0.3):
    """Function that may fail randomly."""
    if random.random() < failure_rate:
        raise Exception(f"Task {task_id} failed randomly")
    
    # Simulate work
    import time
    time.sleep(1)
    return f"Task {task_id} completed successfully"

def robust_function(data, max_retries=3):
    """Function with built-in retry logic."""
    for attempt in range(max_retries):
        try:
            # Simulate operation that might fail
            if random.random() < 0.2:  # 20% failure rate
                raise Exception("Operation failed")
            return f"Processed {len(data)} items successfully"
        except Exception as e:
            if attempt == max_retries - 1:
                return f"Failed after {max_retries} attempts: {str(e)}"
            continue

# Spawn tasks with retry capabilities
reliable_tasks = [
    spawn(
        unreliable_function, 
        args=(i,), 
        kwargs={'failure_rate': 0.2},
        retry_when_task_canceled=True
    )
    for i in range(5)
]

robust_tasks = [
    spawn(robust_function, args=([1, 2, 3, 4, 5],))
    for _ in range(3)
]

# Execute with error handling
try:
    reliable_results = xorbits.run(*reliable_tasks)
    robust_results = xorbits.run(*robust_tasks)
    
    print("Reliable task results:", reliable_results)
    print("Robust task results:", robust_results)
    
except Exception as e:
    print(f"Some tasks failed: {e}")

xorbits.shutdown()

Install with Tessl CLI

npx tessl i tessl/pypi-xorbits

docs

configuration.md

datasets.md

index.md

machine-learning.md

numpy-integration.md

pandas-integration.md

remote-computing.md

runtime-management.md

tile.json