Scalable Python data science, in an API compatible & lightning fast way.
Remote function execution capabilities for distributed computing workloads. Xorbits remote module enables spawning and executing functions across distributed workers for flexible parallel computing patterns.
Spawn and execute functions remotely across distributed Xorbits workers.
def spawn(
func,
args=(),
kwargs=None,
retry_when_fail=False,
n_output=None,
output_types=None,
**kw
):
"""
Spawn remote function execution across distributed workers.
Executes functions on remote workers in the Xorbits cluster,
enabling flexible distributed computing patterns beyond
standard array and DataFrame operations.
Parameters:
- func: callable, function to execute remotely
- args: tuple, positional arguments to pass to function
- kwargs: dict, keyword arguments to pass to function
- retry_when_fail: bool, whether to retry when the task fails
- n_output: int, number of outputs expected from the function
- output_types: list, types of the outputs
- **kw: Additional keyword arguments
Returns:
- Remote execution result that can be retrieved with xorbits.run()
"""Usage Examples:
import xorbits
from xorbits.remote import spawn
import time
xorbits.init()
# Define functions to execute remotely
def compute_heavy_task(n):
"""Simulate computationally heavy task."""
result = 0
for i in range(n):
result += i ** 2
return result
def process_data_chunk(data_chunk, multiplier=2):
"""Process a chunk of data."""
return [x * multiplier for x in data_chunk]
# Spawn remote function execution
task1 = spawn(compute_heavy_task, args=(1000000,))
task2 = spawn(compute_heavy_task, args=(2000000,))
# Execute multiple tasks in parallel
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
processing_tasks = [
spawn(process_data_chunk, args=(chunk,), kwargs={'multiplier': 3})
for chunk in data_chunks
]
# Retrieve results
heavy_results = xorbits.run(task1, task2)
processing_results = xorbits.run(*processing_tasks)
print(f"Heavy computation results: {heavy_results}")
print(f"Data processing results: {processing_results}")
xorbits.shutdown()import xorbits
from xorbits.remote import spawn
import xorbits.pandas as pd
import xorbits.numpy as np
xorbits.init()
# Define custom distributed algorithms
def monte_carlo_pi(num_samples):
"""Monte Carlo estimation of Pi."""
import random
inside_circle = 0
for _ in range(num_samples):
x, y = random.random(), random.random()
if x*x + y*y <= 1:
inside_circle += 1
return 4 * inside_circle / num_samples
def custom_aggregation(data_partition):
"""Custom aggregation function for distributed data."""
import numpy as np
return {
'sum': np.sum(data_partition),
'mean': np.mean(data_partition),
'std': np.std(data_partition),
'count': len(data_partition)
}
# Distributed Monte Carlo computation
num_workers = 4
samples_per_worker = 1000000
pi_tasks = [
spawn(monte_carlo_pi, args=(samples_per_worker,))
for _ in range(num_workers)
]
# Custom distributed data processing
large_array = np.random.random(10000000)
chunk_size = len(large_array) // num_workers
data_chunks = [
large_array[i*chunk_size:(i+1)*chunk_size]
for i in range(num_workers)
]
aggregation_tasks = [
spawn(custom_aggregation, args=(chunk,))
for chunk in data_chunks
]
# Execute distributed computations
pi_estimates = xorbits.run(*pi_tasks)
aggregation_results = xorbits.run(*aggregation_tasks)
# Combine results
final_pi_estimate = sum(pi_estimates) / len(pi_estimates)
print(f"Distributed Pi estimate: {final_pi_estimate}")
# Combine aggregation results
total_sum = sum(result['sum'] for result in aggregation_results)
total_count = sum(result['count'] for result in aggregation_results)
global_mean = total_sum / total_count
print(f"Global mean: {global_mean}")
xorbits.shutdown()import xorbits
from xorbits.remote import spawn
xorbits.init()
def gpu_computation(matrix_size):
"""Computation that requires GPU resources."""
try:
import cupy as cp # GPU library
matrix = cp.random.random((matrix_size, matrix_size))
result = cp.linalg.inv(matrix)
return cp.asnumpy(result.diagonal().sum())
except ImportError:
# Fallback to CPU computation
import numpy as np
matrix = np.random.random((matrix_size, matrix_size))
result = np.linalg.inv(matrix)
return result.diagonal().sum()
def memory_intensive_task(data_size):
"""Task requiring specific memory resources."""
import numpy as np
large_array = np.random.random(data_size)
return np.std(large_array)
# Spawn tasks with resource requirements
gpu_task = spawn(
gpu_computation,
args=(1000,),
resources={'gpu': 1} # Request GPU resource
)
memory_task = spawn(
memory_intensive_task,
args=(10000000,),
resources={'memory': 2 * 1024 * 1024 * 1024} # Request 2GB memory
)
# Execute with resource constraints
results = xorbits.run(gpu_task, memory_task)
print(f"GPU computation result: {results[0]}")
print(f"Memory intensive result: {results[1]}")
xorbits.shutdown()import xorbits
from xorbits.remote import spawn
import xorbits.pandas as pd
xorbits.init()
def extract_features(data_chunk):
"""Extract features from data chunk."""
features = {}
features['mean'] = data_chunk.mean()
features['std'] = data_chunk.std()
features['min'] = data_chunk.min()
features['max'] = data_chunk.max()
features['count'] = len(data_chunk)
return features
def validate_data(data_chunk):
"""Validate data quality."""
issues = []
if data_chunk.isnull().any():
issues.append('contains_nulls')
if (data_chunk < 0).any():
issues.append('contains_negatives')
if data_chunk.std() == 0:
issues.append('no_variance')
return {
'valid': len(issues) == 0,
'issues': issues,
'chunk_size': len(data_chunk)
}
# Load distributed data
large_dataset = pd.read_csv('large_dataset.csv')
# Split into chunks for parallel processing
num_chunks = 8
chunk_size = len(large_dataset) // num_chunks
chunks = [
large_dataset[i*chunk_size:(i+1)*chunk_size]['value']
for i in range(num_chunks)
]
# Process chunks in parallel using remote functions
feature_tasks = [spawn(extract_features, args=(chunk,)) for chunk in chunks]
validation_tasks = [spawn(validate_data, args=(chunk,)) for chunk in chunks]
# Execute parallel processing
feature_results = xorbits.run(*feature_tasks)
validation_results = xorbits.run(*validation_tasks)
# Aggregate results
total_count = sum(f['count'] for f in feature_results)
global_mean = sum(f['mean'] * f['count'] for f in feature_results) / total_count
validation_summary = {
'total_chunks': len(validation_results),
'valid_chunks': sum(1 for v in validation_results if v['valid']),
'common_issues': {}
}
# Count issue frequency
for result in validation_results:
for issue in result['issues']:
validation_summary['common_issues'][issue] = \
validation_summary['common_issues'].get(issue, 0) + 1
print(f"Global mean: {global_mean}")
print(f"Validation summary: {validation_summary}")
xorbits.shutdown()import xorbits
from xorbits.remote import spawn
import random
xorbits.init()
def unreliable_function(task_id, failure_rate=0.3):
"""Function that may fail randomly."""
if random.random() < failure_rate:
raise Exception(f"Task {task_id} failed randomly")
# Simulate work
import time
time.sleep(1)
return f"Task {task_id} completed successfully"
def robust_function(data, max_retries=3):
"""Function with built-in retry logic."""
for attempt in range(max_retries):
try:
# Simulate operation that might fail
if random.random() < 0.2: # 20% failure rate
raise Exception("Operation failed")
return f"Processed {len(data)} items successfully"
except Exception as e:
if attempt == max_retries - 1:
return f"Failed after {max_retries} attempts: {str(e)}"
continue
# Spawn tasks with retry capabilities
reliable_tasks = [
spawn(
unreliable_function,
args=(i,),
kwargs={'failure_rate': 0.2},
retry_when_task_canceled=True
)
for i in range(5)
]
robust_tasks = [
spawn(robust_function, args=([1, 2, 3, 4, 5],))
for _ in range(3)
]
# Execute with error handling
try:
reliable_results = xorbits.run(*reliable_tasks)
robust_results = xorbits.run(*robust_tasks)
print("Reliable task results:", reliable_results)
print("Robust task results:", robust_results)
except Exception as e:
print(f"Some tasks failed: {e}")
xorbits.shutdown()Install with Tessl CLI
npx tessl i tessl/pypi-xorbits