0
# Remote Computing
1
2
Remote function execution capabilities for distributed computing workloads. Xorbits remote module enables spawning and executing functions across distributed workers for flexible parallel computing patterns.
3
4
## Capabilities
5
6
### Remote Function Execution
7
8
Spawn and execute functions remotely across distributed Xorbits workers.
9
10
```python { .api }
11
def spawn(
12
func,
13
args=(),
14
kwargs=None,
15
retry_when_fail=False,
16
n_output=None,
17
output_types=None,
18
**kw
19
):
20
"""
21
Spawn remote function execution across distributed workers.
22
23
Executes functions on remote workers in the Xorbits cluster,
24
enabling flexible distributed computing patterns beyond
25
standard array and DataFrame operations.
26
27
Parameters:
28
- func: callable, function to execute remotely
29
- args: tuple, positional arguments to pass to function
30
- kwargs: dict, keyword arguments to pass to function
31
- retry_when_fail: bool, whether to retry when the task fails
32
- n_output: int, number of outputs expected from the function
33
- output_types: list, types of the outputs
34
- **kw: Additional keyword arguments
35
36
Returns:
37
- Remote execution result that can be retrieved with xorbits.run()
38
"""
39
```
40
41
**Usage Examples:**
42
43
### Basic Remote Function Execution
44
45
```python
46
import xorbits
47
from xorbits.remote import spawn
48
import time
49
50
xorbits.init()
51
52
# Define functions to execute remotely
53
def compute_heavy_task(n):
54
"""Simulate computationally heavy task."""
55
result = 0
56
for i in range(n):
57
result += i ** 2
58
return result
59
60
def process_data_chunk(data_chunk, multiplier=2):
61
"""Process a chunk of data."""
62
return [x * multiplier for x in data_chunk]
63
64
# Spawn remote function execution
65
task1 = spawn(compute_heavy_task, args=(1000000,))
66
task2 = spawn(compute_heavy_task, args=(2000000,))
67
68
# Execute multiple tasks in parallel
69
data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
70
processing_tasks = [
71
spawn(process_data_chunk, args=(chunk,), kwargs={'multiplier': 3})
72
for chunk in data_chunks
73
]
74
75
# Retrieve results
76
heavy_results = xorbits.run(task1, task2)
77
processing_results = xorbits.run(*processing_tasks)
78
79
print(f"Heavy computation results: {heavy_results}")
80
print(f"Data processing results: {processing_results}")
81
82
xorbits.shutdown()
83
```
84
85
### Advanced Remote Computing Patterns
86
87
```python
88
import xorbits
89
from xorbits.remote import spawn
90
import xorbits.pandas as pd
91
import xorbits.numpy as np
92
93
xorbits.init()
94
95
# Define custom distributed algorithms
96
def monte_carlo_pi(num_samples):
97
"""Monte Carlo estimation of Pi."""
98
import random
99
inside_circle = 0
100
for _ in range(num_samples):
101
x, y = random.random(), random.random()
102
if x*x + y*y <= 1:
103
inside_circle += 1
104
return 4 * inside_circle / num_samples
105
106
def custom_aggregation(data_partition):
107
"""Custom aggregation function for distributed data."""
108
import numpy as np
109
return {
110
'sum': np.sum(data_partition),
111
'mean': np.mean(data_partition),
112
'std': np.std(data_partition),
113
'count': len(data_partition)
114
}
115
116
# Distributed Monte Carlo computation
117
num_workers = 4
118
samples_per_worker = 1000000
119
120
pi_tasks = [
121
spawn(monte_carlo_pi, args=(samples_per_worker,))
122
for _ in range(num_workers)
123
]
124
125
# Custom distributed data processing
126
large_array = np.random.random(10000000)
127
chunk_size = len(large_array) // num_workers
128
data_chunks = [
129
large_array[i*chunk_size:(i+1)*chunk_size]
130
for i in range(num_workers)
131
]
132
133
aggregation_tasks = [
134
spawn(custom_aggregation, args=(chunk,))
135
for chunk in data_chunks
136
]
137
138
# Execute distributed computations
139
pi_estimates = xorbits.run(*pi_tasks)
140
aggregation_results = xorbits.run(*aggregation_tasks)
141
142
# Combine results
143
final_pi_estimate = sum(pi_estimates) / len(pi_estimates)
144
print(f"Distributed Pi estimate: {final_pi_estimate}")
145
146
# Combine aggregation results
147
total_sum = sum(result['sum'] for result in aggregation_results)
148
total_count = sum(result['count'] for result in aggregation_results)
149
global_mean = total_sum / total_count
150
print(f"Global mean: {global_mean}")
151
152
xorbits.shutdown()
153
```
154
155
### Remote Function with Resources
156
157
```python
158
import xorbits
159
from xorbits.remote import spawn
160
161
xorbits.init()
162
163
def gpu_computation(matrix_size):
164
"""Computation that requires GPU resources."""
165
try:
166
import cupy as cp # GPU library
167
matrix = cp.random.random((matrix_size, matrix_size))
168
result = cp.linalg.inv(matrix)
169
return cp.asnumpy(result.diagonal().sum())
170
except ImportError:
171
# Fallback to CPU computation
172
import numpy as np
173
matrix = np.random.random((matrix_size, matrix_size))
174
result = np.linalg.inv(matrix)
175
return result.diagonal().sum()
176
177
def memory_intensive_task(data_size):
178
"""Task requiring specific memory resources."""
179
import numpy as np
180
large_array = np.random.random(data_size)
181
return np.std(large_array)
182
183
# Spawn tasks with resource requirements
184
gpu_task = spawn(
185
gpu_computation,
186
args=(1000,),
187
resources={'gpu': 1} # Request GPU resource
188
)
189
190
memory_task = spawn(
191
memory_intensive_task,
192
args=(10000000,),
193
resources={'memory': 2 * 1024 * 1024 * 1024} # Request 2GB memory
194
)
195
196
# Execute with resource constraints
197
results = xorbits.run(gpu_task, memory_task)
198
print(f"GPU computation result: {results[0]}")
199
print(f"Memory intensive result: {results[1]}")
200
201
xorbits.shutdown()
202
```
203
204
### Distributed Data Pipeline with Remote Functions
205
206
```python
207
import xorbits
208
from xorbits.remote import spawn
209
import xorbits.pandas as pd
210
211
xorbits.init()
212
213
def extract_features(data_chunk):
214
"""Extract features from data chunk."""
215
features = {}
216
features['mean'] = data_chunk.mean()
217
features['std'] = data_chunk.std()
218
features['min'] = data_chunk.min()
219
features['max'] = data_chunk.max()
220
features['count'] = len(data_chunk)
221
return features
222
223
def validate_data(data_chunk):
224
"""Validate data quality."""
225
issues = []
226
if data_chunk.isnull().any():
227
issues.append('contains_nulls')
228
if (data_chunk < 0).any():
229
issues.append('contains_negatives')
230
if data_chunk.std() == 0:
231
issues.append('no_variance')
232
return {
233
'valid': len(issues) == 0,
234
'issues': issues,
235
'chunk_size': len(data_chunk)
236
}
237
238
# Load distributed data
239
large_dataset = pd.read_csv('large_dataset.csv')
240
241
# Split into chunks for parallel processing
242
num_chunks = 8
243
chunk_size = len(large_dataset) // num_chunks
244
chunks = [
245
large_dataset[i*chunk_size:(i+1)*chunk_size]['value']
246
for i in range(num_chunks)
247
]
248
249
# Process chunks in parallel using remote functions
250
feature_tasks = [spawn(extract_features, args=(chunk,)) for chunk in chunks]
251
validation_tasks = [spawn(validate_data, args=(chunk,)) for chunk in chunks]
252
253
# Execute parallel processing
254
feature_results = xorbits.run(*feature_tasks)
255
validation_results = xorbits.run(*validation_tasks)
256
257
# Aggregate results
258
total_count = sum(f['count'] for f in feature_results)
259
global_mean = sum(f['mean'] * f['count'] for f in feature_results) / total_count
260
261
validation_summary = {
262
'total_chunks': len(validation_results),
263
'valid_chunks': sum(1 for v in validation_results if v['valid']),
264
'common_issues': {}
265
}
266
267
# Count issue frequency
268
for result in validation_results:
269
for issue in result['issues']:
270
validation_summary['common_issues'][issue] = \
271
validation_summary['common_issues'].get(issue, 0) + 1
272
273
print(f"Global mean: {global_mean}")
274
print(f"Validation summary: {validation_summary}")
275
276
xorbits.shutdown()
277
```
278
279
### Error Handling and Retry Patterns
280
281
```python
282
import xorbits
283
from xorbits.remote import spawn
284
import random
285
286
xorbits.init()
287
288
def unreliable_function(task_id, failure_rate=0.3):
289
"""Function that may fail randomly."""
290
if random.random() < failure_rate:
291
raise Exception(f"Task {task_id} failed randomly")
292
293
# Simulate work
294
import time
295
time.sleep(1)
296
return f"Task {task_id} completed successfully"
297
298
def robust_function(data, max_retries=3):
299
"""Function with built-in retry logic."""
300
for attempt in range(max_retries):
301
try:
302
# Simulate operation that might fail
303
if random.random() < 0.2: # 20% failure rate
304
raise Exception("Operation failed")
305
return f"Processed {len(data)} items successfully"
306
except Exception as e:
307
if attempt == max_retries - 1:
308
return f"Failed after {max_retries} attempts: {str(e)}"
309
continue
310
311
# Spawn tasks with retry capabilities
312
reliable_tasks = [
313
spawn(
314
unreliable_function,
315
args=(i,),
316
kwargs={'failure_rate': 0.2},
317
retry_when_task_canceled=True
318
)
319
for i in range(5)
320
]
321
322
robust_tasks = [
323
spawn(robust_function, args=([1, 2, 3, 4, 5],))
324
for _ in range(3)
325
]
326
327
# Execute with error handling
328
try:
329
reliable_results = xorbits.run(*reliable_tasks)
330
robust_results = xorbits.run(*robust_tasks)
331
332
print("Reliable task results:", reliable_results)
333
print("Robust task results:", robust_results)
334
335
except Exception as e:
336
print(f"Some tasks failed: {e}")
337
338
xorbits.shutdown()
339
```