Ray runtime implementation for Java - the core distributed runtime component of Ray framework for scaling AI and Python applications
—
Type-safe distributed function execution with support for 0-6 parameters, ObjectRef chaining, and comprehensive error handling.
Create remote tasks from static methods or lambda functions with full type safety.
// Task creation methods (0-6 parameters)
public static <R> TaskCaller<R> task(RayFunc0<R> f);
public static <T0, R> TaskCaller<R> task(RayFunc1<T0, R> f, T0 t0);
public static <T0, T1, R> TaskCaller<R> task(RayFunc2<T0, T1, R> f, T0 t0, T1 t1);
public static <T0, T1, T2, R> TaskCaller<R> task(RayFunc3<T0, T1, T2, R> f, T0 t0, T1 t1, T2 t2);
public static <T0, T1, T2, T3, R> TaskCaller<R> task(RayFunc4<T0, T1, T2, T3, R> f, T0 t0, T1 t1, T2 t2, T3 t3);
public static <T0, T1, T2, T3, T4, R> TaskCaller<R> task(RayFunc5<T0, T1, T2, T3, T4, R> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
public static <T0, T1, T2, T3, T4, T5, R> TaskCaller<R> task(RayFunc6<T0, T1, T2, T3, T4, T5, R> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
// Void task creation methods
public static VoidTaskCaller task(RayFuncVoid0 f);
public static <T0> VoidTaskCaller task(RayFuncVoid1<T0> f, T0 t0);
// ... up to RayFuncVoid6Usage Examples:
import io.ray.api.Ray;
import io.ray.api.ObjectRef;
public class TaskExamples {
// Simple static methods for remote execution
public static String processData(String input) {
return "Processed: " + input;
}
public static int multiply(int a, int b) {
return a * b;
}
public static void logMessage(String message) {
System.out.println("Remote log: " + message);
}
public static void main(String[] args) {
Ray.init();
// Zero parameter task
ObjectRef<String> result0 = Ray.task(() -> "Hello from task").remote();
// Single parameter task
ObjectRef<String> result1 = Ray.task(TaskExamples::processData, "input-data").remote();
// Multiple parameter task
ObjectRef<Integer> result2 = Ray.task(TaskExamples::multiply, 6, 7).remote();
// Void task
ObjectRef<Void> voidResult = Ray.task(TaskExamples::logMessage, "Hello").remote();
// Get results
System.out.println(Ray.get(result0)); // "Hello from task"
System.out.println(Ray.get(result1)); // "Processed: input-data"
System.out.println(Ray.get(result2)); // 42
Ray.shutdown();
}
}Pass ObjectRef instances as parameters to create task dependencies without transferring data.
// All task methods support ObjectRef parameters
public static <T0, R> TaskCaller<R> task(RayFunc1<T0, R> f, ObjectRef<T0> t0);
public static <T0, T1, R> TaskCaller<R> task(RayFunc2<T0, T1, R> f, T0 t0, ObjectRef<T1> t1);
public static <T0, T1, R> TaskCaller<R> task(RayFunc2<T0, T1, R> f, ObjectRef<T0> t0, ObjectRef<T1> t1);
// ... all combinations of direct values and ObjectRef parametersUsage Examples:
public class Pipeline {
public static String loadData(String source) {
// Simulate data loading
return "Data from " + source;
}
public static String transformData(String data) {
return "Transformed: " + data;
}
public static String combineData(String data1, String data2) {
return data1 + " + " + data2;
}
public static void main(String[] args) {
Ray.init();
// Create task pipeline with ObjectRef chaining
ObjectRef<String> data1 = Ray.task(Pipeline::loadData, "source1").remote();
ObjectRef<String> data2 = Ray.task(Pipeline::loadData, "source2").remote();
// Transform data (depends on load tasks)
ObjectRef<String> transformed1 = Ray.task(Pipeline::transformData, data1).remote();
ObjectRef<String> transformed2 = Ray.task(Pipeline::transformData, data2).remote();
// Combine results (depends on transform tasks)
ObjectRef<String> final_result = Ray.task(Pipeline::combineData, transformed1, transformed2).remote();
// Only get final result - Ray handles all dependencies
String result = Ray.get(final_result);
System.out.println(result);
Ray.shutdown();
}
}Execute tasks remotely and control their execution.
public interface TaskCaller<R> {
/**
* Execute the task remotely.
* @return ObjectRef to the task result
*/
ObjectRef<R> remote();
}
public interface VoidTaskCaller {
/**
* Execute the void task remotely.
* @return ObjectRef<Void> for synchronization
*/
ObjectRef<Void> remote();
}Usage Example:
// Create task caller
TaskCaller<String> taskCaller = Ray.task(MyClass::processData, "input");
// Execute task
ObjectRef<String> result = taskCaller.remote();
// Wait for completion
String value = Ray.get(result);Type-safe function interfaces for different parameter counts and return types.
// Return value function interfaces
public interface RayFunc0<R> {
R apply();
}
public interface RayFunc1<T0, R> {
R apply(T0 t0);
}
public interface RayFunc2<T0, T1, R> {
R apply(T0 t0, T1 t1);
}
public interface RayFunc3<T0, T1, T2, R> {
R apply(T0 t0, T1 t1, T2 t2);
}
public interface RayFunc4<T0, T1, T2, T3, R> {
R apply(T0 t0, T1 t1, T2 t2, T3 t3);
}
public interface RayFunc5<T0, T1, T2, T3, T4, R> {
R apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
}
public interface RayFunc6<T0, T1, T2, T3, T4, T5, R> {
R apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
}
// Void function interfaces
public interface RayFuncVoid0 {
void apply();
}
public interface RayFuncVoid1<T0> {
void apply(T0 t0);
}
public interface RayFuncVoid2<T0, T1> {
void apply(T0 t0, T1 t1);
}
public interface RayFuncVoid3<T0, T1, T2> {
void apply(T0 t0, T1 t1, T2 t2);
}
public interface RayFuncVoid4<T0, T1, T2, T3> {
void apply(T0 t0, T1 t1, T2 t2, T3 t3);
}
public interface RayFuncVoid5<T0, T1, T2, T3, T4> {
void apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
}
public interface RayFuncVoid6<T0, T1, T2, T3, T4, T5> {
void apply(T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
}Usage Examples:
// Using method references
RayFunc1<String, String> processor = MyClass::processString;
ObjectRef<String> result = Ray.task(processor, "input").remote();
// Using lambda expressions
RayFunc2<Integer, Integer, Integer> adder = (a, b) -> a + b;
ObjectRef<Integer> sum = Ray.task(adder, 10, 20).remote();
// Void functions
RayFuncVoid1<String> logger = System.out::println;
ObjectRef<Void> logResult = Ray.task(logger, "Log message").remote();public class ParallelTasks {
public static double compute(int workerId, double data) {
// Simulate computation
return Math.sqrt(data * workerId);
}
public static void main(String[] args) {
Ray.init();
// Launch many parallel tasks
List<ObjectRef<Double>> results = new ArrayList<>();
for (int i = 0; i < 100; i++) {
ObjectRef<Double> result = Ray.task(ParallelTasks::compute, i, Math.random() * 1000).remote();
results.add(result);
}
// Wait for all results
List<Double> values = Ray.get(results);
// Process results
double sum = values.stream().mapToDouble(Double::doubleValue).sum();
System.out.println("Sum: " + sum);
Ray.shutdown();
}
}public class TaskErrorHandling {
public static String riskyTask(String input) {
if (input.equals("error")) {
throw new RuntimeException("Task failed!");
}
return "Success: " + input;
}
public static void main(String[] args) {
Ray.init();
// Task that will succeed
ObjectRef<String> goodTask = Ray.task(TaskErrorHandling::riskyTask, "good-input").remote();
// Task that will fail
ObjectRef<String> badTask = Ray.task(TaskErrorHandling::riskyTask, "error").remote();
try {
String goodResult = Ray.get(goodTask);
System.out.println(goodResult); // "Success: good-input"
} catch (RayTaskException e) {
System.out.println("Good task failed: " + e.getMessage());
}
try {
String badResult = Ray.get(badTask);
System.out.println(badResult);
} catch (RayTaskException e) {
System.out.println("Bad task failed as expected: " + e.getMessage());
}
Ray.shutdown();
}
}public class TaskComposition {
public static List<String> fetchDataSources() {
return Arrays.asList("db1", "db2", "db3");
}
public static String fetchData(String source) {
return "Data from " + source;
}
public static String aggregateData(List<String> dataList) {
return String.join(", ", dataList);
}
public static void main(String[] args) {
Ray.init();
// Get data sources
ObjectRef<List<String>> sources = Ray.task(TaskComposition::fetchDataSources).remote();
// Fetch from each source (this requires more complex handling)
List<String> sourceList = Ray.get(sources);
List<ObjectRef<String>> dataRefs = new ArrayList<>();
for (String source : sourceList) {
dataRefs.add(Ray.task(TaskComposition::fetchData, source).remote());
}
// Wait for all data
List<String> allData = Ray.get(dataRefs);
// Aggregate results
ObjectRef<String> final_result = Ray.task(TaskComposition::aggregateData, allData).remote();
String result = Ray.get(final_result);
System.out.println("Final result: " + result);
Ray.shutdown();
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ray--ray-runtime