0
# Task Execution
1
2
Type-safe distributed function execution with support for 0-6 parameters, ObjectRef chaining, and comprehensive error handling.
3
4
## Capabilities
5
6
### Task Creation
7
8
Create remote tasks from static methods or lambda functions with full type safety.
9
10
```java { .api }
11
// Task creation methods (0-6 parameters)
12
public static <R> TaskCaller<R> task(RayFunc0<R> f);
13
public static <T0, R> TaskCaller<R> task(RayFunc1<T0, R> f, T0 t0);
14
public static <T0, T1, R> TaskCaller<R> task(RayFunc2<T0, T1, R> f, T0 t0, T1 t1);
15
public static <T0, T1, T2, R> TaskCaller<R> task(RayFunc3<T0, T1, T2, R> f, T0 t0, T1 t1, T2 t2);
16
public static <T0, T1, T2, T3, R> TaskCaller<R> task(RayFunc4<T0, T1, T2, T3, R> f, T0 t0, T1 t1, T2 t2, T3 t3);
17
public static <T0, T1, T2, T3, T4, R> TaskCaller<R> task(RayFunc5<T0, T1, T2, T3, T4, R> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
18
public static <T0, T1, T2, T3, T4, T5, R> TaskCaller<R> task(RayFunc6<T0, T1, T2, T3, T4, T5, R> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
19
20
// Void task creation methods
21
public static VoidTaskCaller task(RayFuncVoid0 f);
22
public static <T0> VoidTaskCaller task(RayFuncVoid1<T0> f, T0 t0);
23
// ... up to RayFuncVoid6
24
```
25
26
**Usage Examples:**
27
28
```java
29
import io.ray.api.Ray;
30
import io.ray.api.ObjectRef;
31
32
public class TaskExamples {
33
34
// Simple static methods for remote execution
35
public static String processData(String input) {
36
return "Processed: " + input;
37
}
38
39
public static int multiply(int a, int b) {
40
return a * b;
41
}
42
43
public static void logMessage(String message) {
44
System.out.println("Remote log: " + message);
45
}
46
47
public static void main(String[] args) {
48
Ray.init();
49
50
// Zero parameter task
51
ObjectRef<String> result0 = Ray.task(() -> "Hello from task").remote();
52
53
// Single parameter task
54
ObjectRef<String> result1 = Ray.task(TaskExamples::processData, "input-data").remote();
55
56
// Multiple parameter task
57
ObjectRef<Integer> result2 = Ray.task(TaskExamples::multiply, 6, 7).remote();
58
59
// Void task
60
ObjectRef<Void> voidResult = Ray.task(TaskExamples::logMessage, "Hello").remote();
61
62
// Get results
63
System.out.println(Ray.get(result0)); // "Hello from task"
64
System.out.println(Ray.get(result1)); // "Processed: input-data"
65
System.out.println(Ray.get(result2)); // 42
66
67
Ray.shutdown();
68
}
69
}
70
```
71
72
### ObjectRef Chaining
73
74
Pass ObjectRef instances as parameters to create task dependencies without transferring data.
75
76
```java { .api }
77
// All task methods support ObjectRef parameters
78
public static <T0, R> TaskCaller<R> task(RayFunc1<T0, R> f, ObjectRef<T0> t0);
79
public static <T0, T1, R> TaskCaller<R> task(RayFunc2<T0, T1, R> f, T0 t0, ObjectRef<T1> t1);
80
public static <T0, T1, R> TaskCaller<R> task(RayFunc2<T0, T1, R> f, ObjectRef<T0> t0, ObjectRef<T1> t1);
81
// ... all combinations of direct values and ObjectRef parameters
82
```
83
84
**Usage Examples:**
85
86
```java
87
public class Pipeline {
88
89
public static String loadData(String source) {
90
// Simulate data loading
91
return "Data from " + source;
92
}
93
94
public static String transformData(String data) {
95
return "Transformed: " + data;
96
}
97
98
public static String combineData(String data1, String data2) {
99
return data1 + " + " + data2;
100
}
101
102
public static void main(String[] args) {
103
Ray.init();
104
105
// Create task pipeline with ObjectRef chaining
106
ObjectRef<String> data1 = Ray.task(Pipeline::loadData, "source1").remote();
107
ObjectRef<String> data2 = Ray.task(Pipeline::loadData, "source2").remote();
108
109
// Transform data (depends on load tasks)
110
ObjectRef<String> transformed1 = Ray.task(Pipeline::transformData, data1).remote();
111
ObjectRef<String> transformed2 = Ray.task(Pipeline::transformData, data2).remote();
112
113
// Combine results (depends on transform tasks)
114
ObjectRef<String> final_result = Ray.task(Pipeline::combineData, transformed1, transformed2).remote();
115
116
// Only get final result - Ray handles all dependencies
117
String result = Ray.get(final_result);
118
System.out.println(result);
119
120
Ray.shutdown();
121
}
122
}
123
```
124
125
### Task Execution
126
127
Execute tasks remotely and control their execution.
128
129
```java { .api }
130
public interface TaskCaller<R> {
131
/**
132
* Execute the task remotely.
133
* @return ObjectRef to the task result
134
*/
135
ObjectRef<R> remote();
136
}
137
138
public interface VoidTaskCaller {
139
/**
140
* Execute the void task remotely.
141
* @return ObjectRef<Void> for synchronization
142
*/
143
ObjectRef<Void> remote();
144
}
145
```
146
147
**Usage Example:**
148
149
```java
150
// Create task caller
151
TaskCaller<String> taskCaller = Ray.task(MyClass::processData, "input");
152
153
// Execute task
154
ObjectRef<String> result = taskCaller.remote();
155
156
// Wait for completion
157
String value = Ray.get(result);
158
```
159
160
### Function Interfaces
161
162
Type-safe function interfaces for different parameter counts and return types.
163
164
```java { .api }
165
// Return value function interfaces
166
public interface RayFunc0<R> {
167
R apply();
168
}
169
170
public interface RayFunc1<T0, R> {
171
R apply(T0 t0);
172
}
173
174
public interface RayFunc2<T0, T1, R> {
175
R apply(T0 t0, T1 t1);
176
}
177
178
public interface RayFunc3<T0, T1, T2, R> {
179
R apply(T0 t0, T1 t1, T2 t2);
180
}
181
182
public interface RayFunc4<T0, T1, T2, T3, R> {
183
R apply(T0 t0, T1 t1, T2 t2, T3 t3);
184
}
185
186
public interface RayFunc5<T0, T1, T2, T3, T4, R> {
187
R apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
188
}
189
190
public interface RayFunc6<T0, T1, T2, T3, T4, T5, R> {
191
R apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
192
}
193
194
// Void function interfaces
195
public interface RayFuncVoid0 {
196
void apply();
197
}
198
199
public interface RayFuncVoid1<T0> {
200
void apply(T0 t0);
201
}
202
203
public interface RayFuncVoid2<T0, T1> {
204
void apply(T0 t0, T1 t1);
205
}
206
207
public interface RayFuncVoid3<T0, T1, T2> {
208
void apply(T0 t0, T1 t1, T2 t2);
209
}
210
211
public interface RayFuncVoid4<T0, T1, T2, T3> {
212
void apply(T0 t0, T1 t1, T2 t2, T3 t3);
213
}
214
215
public interface RayFuncVoid5<T0, T1, T2, T3, T4> {
216
void apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
217
}
218
219
public interface RayFuncVoid6<T0, T1, T2, T3, T4, T5> {
220
void apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
221
}
222
```
223
224
**Usage Examples:**
225
226
```java
227
// Using method references
228
RayFunc1<String, String> processor = MyClass::processString;
229
ObjectRef<String> result = Ray.task(processor, "input").remote();
230
231
// Using lambda expressions
232
RayFunc2<Integer, Integer, Integer> adder = (a, b) -> a + b;
233
ObjectRef<Integer> sum = Ray.task(adder, 10, 20).remote();
234
235
// Void functions
236
RayFuncVoid1<String> logger = System.out::println;
237
ObjectRef<Void> logResult = Ray.task(logger, "Log message").remote();
238
```
239
240
## Advanced Task Patterns
241
242
### Parallel Execution
243
244
```java
245
public class ParallelTasks {
246
247
public static double compute(int workerId, double data) {
248
// Simulate computation
249
return Math.sqrt(data * workerId);
250
}
251
252
public static void main(String[] args) {
253
Ray.init();
254
255
// Launch many parallel tasks
256
List<ObjectRef<Double>> results = new ArrayList<>();
257
for (int i = 0; i < 100; i++) {
258
ObjectRef<Double> result = Ray.task(ParallelTasks::compute, i, Math.random() * 1000).remote();
259
results.add(result);
260
}
261
262
// Wait for all results
263
List<Double> values = Ray.get(results);
264
265
// Process results
266
double sum = values.stream().mapToDouble(Double::doubleValue).sum();
267
System.out.println("Sum: " + sum);
268
269
Ray.shutdown();
270
}
271
}
272
```
273
274
### Error Handling
275
276
```java
277
public class TaskErrorHandling {
278
279
public static String riskyTask(String input) {
280
if (input.equals("error")) {
281
throw new RuntimeException("Task failed!");
282
}
283
return "Success: " + input;
284
}
285
286
public static void main(String[] args) {
287
Ray.init();
288
289
// Task that will succeed
290
ObjectRef<String> goodTask = Ray.task(TaskErrorHandling::riskyTask, "good-input").remote();
291
292
// Task that will fail
293
ObjectRef<String> badTask = Ray.task(TaskErrorHandling::riskyTask, "error").remote();
294
295
try {
296
String goodResult = Ray.get(goodTask);
297
System.out.println(goodResult); // "Success: good-input"
298
} catch (RayTaskException e) {
299
System.out.println("Good task failed: " + e.getMessage());
300
}
301
302
try {
303
String badResult = Ray.get(badTask);
304
System.out.println(badResult);
305
} catch (RayTaskException e) {
306
System.out.println("Bad task failed as expected: " + e.getMessage());
307
}
308
309
Ray.shutdown();
310
}
311
}
312
```
313
314
### Task Composition
315
316
```java
317
public class TaskComposition {
318
319
public static List<String> fetchDataSources() {
320
return Arrays.asList("db1", "db2", "db3");
321
}
322
323
public static String fetchData(String source) {
324
return "Data from " + source;
325
}
326
327
public static String aggregateData(List<String> dataList) {
328
return String.join(", ", dataList);
329
}
330
331
public static void main(String[] args) {
332
Ray.init();
333
334
// Get data sources
335
ObjectRef<List<String>> sources = Ray.task(TaskComposition::fetchDataSources).remote();
336
337
// Fetch from each source (this requires more complex handling)
338
List<String> sourceList = Ray.get(sources);
339
List<ObjectRef<String>> dataRefs = new ArrayList<>();
340
341
for (String source : sourceList) {
342
dataRefs.add(Ray.task(TaskComposition::fetchData, source).remote());
343
}
344
345
// Wait for all data
346
List<String> allData = Ray.get(dataRefs);
347
348
// Aggregate results
349
ObjectRef<String> final_result = Ray.task(TaskComposition::aggregateData, allData).remote();
350
351
String result = Ray.get(final_result);
352
System.out.println("Final result: " + result);
353
354
Ray.shutdown();
355
}
356
}
357
```