or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddatasets.mdindex.mdmachine-learning.mdnumpy-integration.mdpandas-integration.mdremote-computing.mdruntime-management.md

remote-computing.mddocs/

0

# Remote Computing

1

2

Remote function execution capabilities for distributed computing workloads. Xorbits remote module enables spawning and executing functions across distributed workers for flexible parallel computing patterns.

3

4

## Capabilities

5

6

### Remote Function Execution

7

8

Spawn and execute functions remotely across distributed Xorbits workers.

9

10

```python { .api }

11

def spawn(

12

func,

13

args=(),

14

kwargs=None,

15

retry_when_fail=False,

16

n_output=None,

17

output_types=None,

18

**kw

19

):

20

"""

21

Spawn remote function execution across distributed workers.

22

23

Executes functions on remote workers in the Xorbits cluster,

24

enabling flexible distributed computing patterns beyond

25

standard array and DataFrame operations.

26

27

Parameters:

28

- func: callable, function to execute remotely

29

- args: tuple, positional arguments to pass to function

30

- kwargs: dict, keyword arguments to pass to function

31

- retry_when_fail: bool, whether to retry when the task fails

32

- n_output: int, number of outputs expected from the function

33

- output_types: list, types of the outputs

34

- **kw: Additional keyword arguments

35

36

Returns:

37

- Remote execution result that can be retrieved with xorbits.run()

38

"""

39

```

40

41

**Usage Examples:**

42

43

### Basic Remote Function Execution

44

45

```python

46

import xorbits

47

from xorbits.remote import spawn

48

import time

49

50

xorbits.init()

51

52

# Define functions to execute remotely

53

def compute_heavy_task(n):

54

"""Simulate computationally heavy task."""

55

result = 0

56

for i in range(n):

57

result += i ** 2

58

return result

59

60

def process_data_chunk(data_chunk, multiplier=2):

61

"""Process a chunk of data."""

62

return [x * multiplier for x in data_chunk]

63

64

# Spawn remote function execution

65

task1 = spawn(compute_heavy_task, args=(1000000,))

66

task2 = spawn(compute_heavy_task, args=(2000000,))

67

68

# Execute multiple tasks in parallel

69

data_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

70

processing_tasks = [

71

spawn(process_data_chunk, args=(chunk,), kwargs={'multiplier': 3})

72

for chunk in data_chunks

73

]

74

75

# Retrieve results

76

heavy_results = xorbits.run(task1, task2)

77

processing_results = xorbits.run(*processing_tasks)

78

79

print(f"Heavy computation results: {heavy_results}")

80

print(f"Data processing results: {processing_results}")

81

82

xorbits.shutdown()

83

```

84

85

### Advanced Remote Computing Patterns

86

87

```python

88

import xorbits

89

from xorbits.remote import spawn

90

import xorbits.pandas as pd

91

import xorbits.numpy as np

92

93

xorbits.init()

94

95

# Define custom distributed algorithms

96

def monte_carlo_pi(num_samples):

97

"""Monte Carlo estimation of Pi."""

98

import random

99

inside_circle = 0

100

for _ in range(num_samples):

101

x, y = random.random(), random.random()

102

if x*x + y*y <= 1:

103

inside_circle += 1

104

return 4 * inside_circle / num_samples

105

106

def custom_aggregation(data_partition):

107

"""Custom aggregation function for distributed data."""

108

import numpy as np

109

return {

110

'sum': np.sum(data_partition),

111

'mean': np.mean(data_partition),

112

'std': np.std(data_partition),

113

'count': len(data_partition)

114

}

115

116

# Distributed Monte Carlo computation

117

num_workers = 4

118

samples_per_worker = 1000000

119

120

pi_tasks = [

121

spawn(monte_carlo_pi, args=(samples_per_worker,))

122

for _ in range(num_workers)

123

]

124

125

# Custom distributed data processing

126

large_array = np.random.random(10000000)

127

chunk_size = len(large_array) // num_workers

128

data_chunks = [

129

large_array[i*chunk_size:(i+1)*chunk_size]

130

for i in range(num_workers)

131

]

132

133

aggregation_tasks = [

134

spawn(custom_aggregation, args=(chunk,))

135

for chunk in data_chunks

136

]

137

138

# Execute distributed computations

139

pi_estimates = xorbits.run(*pi_tasks)

140

aggregation_results = xorbits.run(*aggregation_tasks)

141

142

# Combine results

143

final_pi_estimate = sum(pi_estimates) / len(pi_estimates)

144

print(f"Distributed Pi estimate: {final_pi_estimate}")

145

146

# Combine aggregation results

147

total_sum = sum(result['sum'] for result in aggregation_results)

148

total_count = sum(result['count'] for result in aggregation_results)

149

global_mean = total_sum / total_count

150

print(f"Global mean: {global_mean}")

151

152

xorbits.shutdown()

153

```

154

155

### Remote Function with Resources

156

157

```python

158

import xorbits

159

from xorbits.remote import spawn

160

161

xorbits.init()

162

163

def gpu_computation(matrix_size):

164

"""Computation that requires GPU resources."""

165

try:

166

import cupy as cp # GPU library

167

matrix = cp.random.random((matrix_size, matrix_size))

168

result = cp.linalg.inv(matrix)

169

return cp.asnumpy(result.diagonal().sum())

170

except ImportError:

171

# Fallback to CPU computation

172

import numpy as np

173

matrix = np.random.random((matrix_size, matrix_size))

174

result = np.linalg.inv(matrix)

175

return result.diagonal().sum()

176

177

def memory_intensive_task(data_size):

178

"""Task requiring specific memory resources."""

179

import numpy as np

180

large_array = np.random.random(data_size)

181

return np.std(large_array)

182

183

# Spawn tasks with resource requirements

184

gpu_task = spawn(

185

gpu_computation,

186

args=(1000,),

187

resources={'gpu': 1} # Request GPU resource

188

)

189

190

memory_task = spawn(

191

memory_intensive_task,

192

args=(10000000,),

193

resources={'memory': 2 * 1024 * 1024 * 1024} # Request 2GB memory

194

)

195

196

# Execute with resource constraints

197

results = xorbits.run(gpu_task, memory_task)

198

print(f"GPU computation result: {results[0]}")

199

print(f"Memory intensive result: {results[1]}")

200

201

xorbits.shutdown()

202

```

203

204

### Distributed Data Pipeline with Remote Functions

205

206

```python

207

import xorbits

208

from xorbits.remote import spawn

209

import xorbits.pandas as pd

210

211

xorbits.init()

212

213

def extract_features(data_chunk):

214

"""Extract features from data chunk."""

215

features = {}

216

features['mean'] = data_chunk.mean()

217

features['std'] = data_chunk.std()

218

features['min'] = data_chunk.min()

219

features['max'] = data_chunk.max()

220

features['count'] = len(data_chunk)

221

return features

222

223

def validate_data(data_chunk):

224

"""Validate data quality."""

225

issues = []

226

if data_chunk.isnull().any():

227

issues.append('contains_nulls')

228

if (data_chunk < 0).any():

229

issues.append('contains_negatives')

230

if data_chunk.std() == 0:

231

issues.append('no_variance')

232

return {

233

'valid': len(issues) == 0,

234

'issues': issues,

235

'chunk_size': len(data_chunk)

236

}

237

238

# Load distributed data

239

large_dataset = pd.read_csv('large_dataset.csv')

240

241

# Split into chunks for parallel processing

242

num_chunks = 8

243

chunk_size = len(large_dataset) // num_chunks

244

chunks = [

245

large_dataset[i*chunk_size:(i+1)*chunk_size]['value']

246

for i in range(num_chunks)

247

]

248

249

# Process chunks in parallel using remote functions

250

feature_tasks = [spawn(extract_features, args=(chunk,)) for chunk in chunks]

251

validation_tasks = [spawn(validate_data, args=(chunk,)) for chunk in chunks]

252

253

# Execute parallel processing

254

feature_results = xorbits.run(*feature_tasks)

255

validation_results = xorbits.run(*validation_tasks)

256

257

# Aggregate results

258

total_count = sum(f['count'] for f in feature_results)

259

global_mean = sum(f['mean'] * f['count'] for f in feature_results) / total_count

260

261

validation_summary = {

262

'total_chunks': len(validation_results),

263

'valid_chunks': sum(1 for v in validation_results if v['valid']),

264

'common_issues': {}

265

}

266

267

# Count issue frequency

268

for result in validation_results:

269

for issue in result['issues']:

270

validation_summary['common_issues'][issue] = \

271

validation_summary['common_issues'].get(issue, 0) + 1

272

273

print(f"Global mean: {global_mean}")

274

print(f"Validation summary: {validation_summary}")

275

276

xorbits.shutdown()

277

```

278

279

### Error Handling and Retry Patterns

280

281

```python

282

import xorbits

283

from xorbits.remote import spawn

284

import random

285

286

xorbits.init()

287

288

def unreliable_function(task_id, failure_rate=0.3):

289

"""Function that may fail randomly."""

290

if random.random() < failure_rate:

291

raise Exception(f"Task {task_id} failed randomly")

292

293

# Simulate work

294

import time

295

time.sleep(1)

296

return f"Task {task_id} completed successfully"

297

298

def robust_function(data, max_retries=3):

299

"""Function with built-in retry logic."""

300

for attempt in range(max_retries):

301

try:

302

# Simulate operation that might fail

303

if random.random() < 0.2: # 20% failure rate

304

raise Exception("Operation failed")

305

return f"Processed {len(data)} items successfully"

306

except Exception as e:

307

if attempt == max_retries - 1:

308

return f"Failed after {max_retries} attempts: {str(e)}"

309

continue

310

311

# Spawn tasks with retry capabilities

312

reliable_tasks = [

313

spawn(

314

unreliable_function,

315

args=(i,),

316

kwargs={'failure_rate': 0.2},

317

retry_when_task_canceled=True

318

)

319

for i in range(5)

320

]

321

322

robust_tasks = [

323

spawn(robust_function, args=([1, 2, 3, 4, 5],))

324

for _ in range(3)

325

]

326

327

# Execute with error handling

328

try:

329

reliable_results = xorbits.run(*reliable_tasks)

330

robust_results = xorbits.run(*robust_tasks)

331

332

print("Reliable task results:", reliable_results)

333

print("Robust task results:", robust_results)

334

335

except Exception as e:

336

print(f"Some tasks failed: {e}")

337

338

xorbits.shutdown()

339

```