Ray runtime implementation for Java - the core distributed runtime component of Ray framework for scaling AI and Python applications
—
Stateful distributed workers with method-level task execution, lifecycle management, and resource control for building robust distributed applications.
Configure actor creation with detailed options including naming, resources, and lifecycle settings.
/**
* Options for configuring actor creation.
*/
public class ActorCreationOptions {
/** Constant for no actor restarts */
public static final int NO_RESTART = 0;
/** Constant for infinite actor restarts */
public static final int INFINITE_RESTART = -1;
/**
* Create builder for actor creation options.
* @return ActorCreationOptions.Builder
*/
public static Builder builder();
public static class Builder {
/**
* Set actor name for service discovery.
* @param name Actor name
* @return Builder for method chaining
*/
public Builder setName(String name);
/**
* Set actor lifetime policy.
* @param lifetime Actor lifetime setting
* @return Builder for method chaining
*/
public Builder setLifetime(ActorLifetime lifetime);
/**
* Set single resource requirement.
* @param resource Resource name
* @param quantity Resource quantity
* @return Builder for method chaining
*/
public Builder setResource(String resource, Double quantity);
/**
* Set multiple resource requirements.
* @param resources Map of resource names to quantities
* @return Builder for method chaining
*/
public Builder setResources(Map<String, Double> resources);
/**
* Set maximum number of actor restarts.
* @param maxRestarts Maximum restarts (use constants NO_RESTART or INFINITE_RESTART)
* @return Builder for method chaining
*/
public Builder setMaxRestarts(int maxRestarts);
/**
* Set maximum task retries per actor method call.
* @param maxTaskRetries Maximum task retries
* @return Builder for method chaining
*/
public Builder setMaxTaskRetries(int maxTaskRetries);
/**
* Set JVM options for the actor process.
* @param jvmOptions List of JVM options
* @return Builder for method chaining
*/
public Builder setJvmOptions(List<String> jvmOptions);
/**
* Set maximum concurrent method calls.
* @param maxConcurrency Maximum concurrent calls
* @return Builder for method chaining
*/
public Builder setMaxConcurrency(int maxConcurrency);
/**
* Set placement group and bundle index.
* @param placementGroup Placement group
* @param bundleIndex Bundle index within placement group
* @return Builder for method chaining
*/
public Builder setPlacementGroup(PlacementGroup placementGroup, int bundleIndex);
/**
* Set runtime environment.
* @param runtimeEnv Runtime environment configuration
* @return Builder for method chaining
*/
public Builder setRuntimeEnv(RuntimeEnv runtimeEnv);
/**
* Build the actor creation options.
* @return ActorCreationOptions instance
*/
public ActorCreationOptions build();
}
}
/**
* Actor lifetime enumeration.
*/
public enum ActorLifetime {
/** Actor dies when creator process dies */
NON_DETACHED,
/** Actor persists beyond creator process lifetime */
DETACHED
}Create stateful distributed workers that persist across multiple method calls.
// Actor creation methods (0-6 parameters)
public static <A> ActorCreator<A> actor(RayFunc0<A> f);
public static <T0, A> ActorCreator<A> actor(RayFunc1<T0, A> f, T0 t0);
public static <T0, T1, A> ActorCreator<A> actor(RayFunc2<T0, T1, A> f, T0 t0, T1 t1);
public static <T0, T1, T2, A> ActorCreator<A> actor(RayFunc3<T0, T1, T2, A> f, T0 t0, T1 t1, T2 t2);
public static <T0, T1, T2, T3, A> ActorCreator<A> actor(RayFunc4<T0, T1, T2, T3, A> f, T0 t0, T1 t1, T2 t2, T3 t3);
public static <T0, T1, T2, T3, T4, A> ActorCreator<A> actor(RayFunc5<T0, T1, T2, T3, T4, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
public static <T0, T1, T2, T3, T4, T5, A> ActorCreator<A> actor(RayFunc6<T0, T1, T2, T3, T4, T5, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
public interface ActorCreator<A> {
/**
* Create the actor remotely.
* @return ActorHandle for calling actor methods
*/
ActorHandle<A> remote();
/**
* Set JVM options for the actor.
* @param jvmOptions List of JVM options
* @return ActorCreator for method chaining
*/
ActorCreator<A> setJvmOptions(List<String> jvmOptions);
/**
* Set concurrency groups for the actor.
* @param concurrencyGroups Concurrency group configuration
* @return ActorCreator for method chaining
*/
ActorCreator<A> setConcurrencyGroups(ConcurrencyGroup... concurrencyGroups);
/**
* Set runtime environment for the actor.
* @param runtimeEnv Runtime environment configuration
* @return ActorCreator for method chaining
*/
ActorCreator<A> setRuntimeEnv(RuntimeEnv runtimeEnv);
}Usage Examples:
public class Counter {
private int count = 0;
public Counter(int initialValue) {
this.count = initialValue;
}
public int increment() {
return ++count;
}
public int getValue() {
return count;
}
public void reset() {
count = 0;
}
}
public class ActorExample {
public static void main(String[] args) {
Ray.init();
// Create actor with constructor parameter
ActorHandle<Counter> counter = Ray.actor(Counter::new, 10).remote();
// Create actor with configuration
ActorHandle<Counter> configuredCounter = Ray.actor(Counter::new, 0)
.setJvmOptions(Arrays.asList("-Xmx1g", "-Xms512m"))
.setMaxConcurrency(4)
.remote();
// Create named actor with full configuration using ActorCreationOptions
ActorCreationOptions options = ActorCreationOptions.builder()
.setName("global-counter")
.setLifetime(ActorLifetime.DETACHED)
.setResources(Map.of("CPU", 2.0, "memory", 1000.0))
.setMaxRestarts(5)
.setJvmOptions(Arrays.asList("-Xmx2g"))
.setMaxConcurrency(8)
.build();
// Note: Full ActorCreationOptions integration would require additional API
// This demonstrates the options builder pattern available
Ray.shutdown();
}
}Call methods on remote actors with full type safety and ObjectRef support.
public interface ActorHandle<A> extends BaseActorHandle {
/**
* Call actor method with return value.
* Available for all method signatures (0-6 parameters).
*/
<R> ActorTaskCaller<R> task(/* method reference and parameters */);
}
public interface ActorTaskCaller<R> {
/**
* Execute the actor method call remotely.
* @return ObjectRef to the method result
*/
ObjectRef<R> remote();
}Usage Examples:
public class ActorMethodCalls {
public static void main(String[] args) {
Ray.init();
// Create counter actor
ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();
// Call methods and get results
ObjectRef<Integer> result1 = counter.task(Counter::increment).remote();
ObjectRef<Integer> result2 = counter.task(Counter::increment).remote();
ObjectRef<Integer> current = counter.task(Counter::getValue).remote();
// Methods execute in order on the same actor instance
System.out.println(Ray.get(result1)); // 1
System.out.println(Ray.get(result2)); // 2
System.out.println(Ray.get(current)); // 2
// Void method calls
ObjectRef<Void> resetResult = counter.task(Counter::reset).remote();
Ray.get(resetResult); // Wait for completion
ObjectRef<Integer> afterReset = counter.task(Counter::getValue).remote();
System.out.println(Ray.get(afterReset)); // 0
Ray.shutdown();
}
}Manage actor lifecycle and get actor information.
public interface BaseActorHandle {
/**
* Get the actor ID.
* @return ActorId of this actor
*/
ActorId getId();
/**
* Kill the actor (allows restart).
*/
void kill();
/**
* Kill the actor with restart control.
* @param noRestart If true, prevent actor restart
*/
void kill(boolean noRestart);
}
public interface ActorHandle<A> extends BaseActorHandle {
// Inherits from BaseActorHandle
// Plus type-safe method calling capability
}Usage Examples:
public class ActorLifecycle {
public static void main(String[] args) {
Ray.init();
// Create actor
ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();
// Get actor information
ActorId actorId = counter.getId();
System.out.println("Actor ID: " + actorId);
// Use actor
ObjectRef<Integer> result = counter.task(Counter::increment).remote();
System.out.println("Result: " + Ray.get(result));
// Kill actor (allows restart)
counter.kill();
// Kill actor permanently
// counter.kill(true);
Ray.shutdown();
}
}Create and retrieve actors by name for service discovery.
/**
* Get a handle to a named actor in current namespace.
* @param name The name of the named actor
* @return Optional ActorHandle if actor exists
* @throws RayException if timed out
*/
public static <T extends BaseActorHandle> Optional<T> getActor(String name);
/**
* Get a handle to a named actor in specified namespace.
* @param name The name of the named actor
* @param namespace The namespace of the actor
* @return Optional ActorHandle if actor exists
* @throws RayException if timed out
*/
public static <T extends BaseActorHandle> Optional<T> getActor(String name, String namespace);Usage Examples:
public class NamedActors {
public static void main(String[] args) {
Ray.init();
// Create named actor (requires ActorCreationOptions)
ActorCreationOptions options = ActorCreationOptions.builder()
.setName("global-counter")
.setLifetime(ActorLifetime.DETACHED)
.build();
// Note: Named actor creation requires using options
// ActorHandle<Counter> counter = Ray.actor(Counter::new, 0)
// .setOptions(options)
// .remote();
// Get named actor from anywhere in the cluster
Optional<ActorHandle<Counter>> maybeCounter = Ray.getActor("global-counter");
if (maybeCounter.isPresent()) {
ActorHandle<Counter> counter = maybeCounter.get();
ObjectRef<Integer> result = counter.task(Counter::increment).remote();
System.out.println("Global counter: " + Ray.get(result));
} else {
System.out.println("Named actor not found");
}
// Get from specific namespace
Optional<ActorHandle<Counter>> nsCounter = Ray.getActor("counter", "production");
Ray.shutdown();
}
}Gracefully exit from within an actor.
/**
* Intentionally exit the current actor.
* Must be called from within an actor method.
* @throws RuntimeException if not called from within an actor
*/
public static void exitActor();Usage Example:
public class GracefulActor {
private boolean shouldStop = false;
public void processWork(String work) {
// Process work...
System.out.println("Processing: " + work);
if (shouldStop) {
// Clean shutdown
System.out.println("Actor shutting down gracefully");
Ray.exitActor();
}
}
public void shutdown() {
shouldStop = true;
}
}
public class ActorExit {
public static void main(String[] args) {
Ray.init();
ActorHandle<GracefulActor> actor = Ray.actor(GracefulActor::new).remote();
// Process some work
Ray.get(actor.task(GracefulActor::processWork, "task1").remote());
Ray.get(actor.task(GracefulActor::processWork, "task2").remote());
// Signal shutdown
Ray.get(actor.task(GracefulActor::shutdown).remote());
// This will cause the actor to exit
Ray.get(actor.task(GracefulActor::processWork, "final-task").remote());
Ray.shutdown();
}
}public class DatabaseService {
private Map<String, String> data = new HashMap<>();
public void put(String key, String value) {
data.put(key, value);
}
public String get(String key) {
return data.get(key);
}
public Set<String> keys() {
return new HashSet<>(data.keySet());
}
public int size() {
return data.size();
}
}
public class StatefulService {
public static void main(String[] args) {
Ray.init();
// Create stateful database service
ActorHandle<DatabaseService> db = Ray.actor(DatabaseService::new).remote();
// Store data
Ray.get(db.task(DatabaseService::put, "user1", "Alice").remote());
Ray.get(db.task(DatabaseService::put, "user2", "Bob").remote());
// Query data
ObjectRef<String> user1 = db.task(DatabaseService::get, "user1").remote();
ObjectRef<Integer> count = db.task(DatabaseService::size).remote();
System.out.println("User1: " + Ray.get(user1)); // "Alice"
System.out.println("Count: " + Ray.get(count)); // 2
Ray.shutdown();
}
}public class Worker {
private final int workerId;
public Worker(int id) {
this.workerId = id;
}
public String process(String task) {
// Simulate work
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Worker " + workerId + " processed: " + task;
}
}
public class ActorPool {
public static void main(String[] args) {
Ray.init();
// Create pool of worker actors
List<ActorHandle<Worker>> workers = new ArrayList<>();
for (int i = 0; i < 5; i++) {
workers.add(Ray.actor(Worker::new, i).remote());
}
// Distribute work across pool
List<ObjectRef<String>> results = new ArrayList<>();
for (int i = 0; i < 20; i++) {
ActorHandle<Worker> worker = workers.get(i % workers.size());
ObjectRef<String> result = worker.task(Worker::process, "task-" + i).remote();
results.add(result);
}
// Wait for all results
List<String> allResults = Ray.get(results);
allResults.forEach(System.out::println);
Ray.shutdown();
}
}public class ResilientActor {
private int processedCount = 0;
public String processTask(String task) {
processedCount++;
// Simulate occasional failures
if (task.contains("error")) {
throw new RuntimeException("Task processing failed: " + task);
}
return "Processed " + task + " (total: " + processedCount + ")";
}
public int getProcessedCount() {
return processedCount;
}
}
public class ActorErrorHandling {
public static void main(String[] args) {
Ray.init();
ActorHandle<ResilientActor> actor = Ray.actor(ResilientActor::new).remote();
// Process successful tasks
ObjectRef<String> result1 = actor.task(ResilientActor::processTask, "task1").remote();
System.out.println(Ray.get(result1));
// Process failing task
try {
ObjectRef<String> result2 = actor.task(ResilientActor::processTask, "error-task").remote();
Ray.get(result2);
} catch (RayTaskException e) {
System.out.println("Task failed as expected: " + e.getMessage());
}
// Actor state is preserved despite the error
ObjectRef<Integer> count = actor.task(ResilientActor::getProcessedCount).remote();
System.out.println("Tasks processed: " + Ray.get(count)); // 2 (including the failed one)
Ray.shutdown();
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ray--ray-runtime