CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ray--ray-runtime

Ray runtime implementation for Java - the core distributed runtime component of Ray framework for scaling AI and Python applications

Pending
Overview
Eval results
Files

actors.mddocs/

Actor Programming

Stateful distributed workers with method-level task execution, lifecycle management, and resource control for building robust distributed applications.

Capabilities

Actor Creation Options

Configure actor creation with detailed options including naming, resources, and lifecycle settings.

/**
 * Options for configuring actor creation.
 */
public class ActorCreationOptions {
    /** Constant for no actor restarts */
    public static final int NO_RESTART = 0;
    
    /** Constant for infinite actor restarts */
    public static final int INFINITE_RESTART = -1;
    
    /**
     * Create builder for actor creation options.
     * @return ActorCreationOptions.Builder
     */
    public static Builder builder();
    
    public static class Builder {
        /**
         * Set actor name for service discovery.
         * @param name Actor name
         * @return Builder for method chaining
         */
        public Builder setName(String name);
        
        /**
         * Set actor lifetime policy.
         * @param lifetime Actor lifetime setting
         * @return Builder for method chaining
         */
        public Builder setLifetime(ActorLifetime lifetime);
        
        /**
         * Set single resource requirement.
         * @param resource Resource name
         * @param quantity Resource quantity
         * @return Builder for method chaining
         */
        public Builder setResource(String resource, Double quantity);
        
        /**
         * Set multiple resource requirements.
         * @param resources Map of resource names to quantities
         * @return Builder for method chaining
         */
        public Builder setResources(Map<String, Double> resources);
        
        /**
         * Set maximum number of actor restarts.
         * @param maxRestarts Maximum restarts (use constants NO_RESTART or INFINITE_RESTART)
         * @return Builder for method chaining
         */
        public Builder setMaxRestarts(int maxRestarts);
        
        /**
         * Set maximum task retries per actor method call.
         * @param maxTaskRetries Maximum task retries
         * @return Builder for method chaining
         */
        public Builder setMaxTaskRetries(int maxTaskRetries);
        
        /**
         * Set JVM options for the actor process.
         * @param jvmOptions List of JVM options
         * @return Builder for method chaining
         */
        public Builder setJvmOptions(List<String> jvmOptions);
        
        /**
         * Set maximum concurrent method calls.
         * @param maxConcurrency Maximum concurrent calls
         * @return Builder for method chaining
         */
        public Builder setMaxConcurrency(int maxConcurrency);
        
        /**
         * Set placement group and bundle index.
         * @param placementGroup Placement group
         * @param bundleIndex Bundle index within placement group
         * @return Builder for method chaining
         */
        public Builder setPlacementGroup(PlacementGroup placementGroup, int bundleIndex);
        
        /**
         * Set runtime environment.
         * @param runtimeEnv Runtime environment configuration
         * @return Builder for method chaining
         */
        public Builder setRuntimeEnv(RuntimeEnv runtimeEnv);
        
        /**
         * Build the actor creation options.
         * @return ActorCreationOptions instance
         */
        public ActorCreationOptions build();
    }
}

/**
 * Actor lifetime enumeration.
 */
public enum ActorLifetime {
    /** Actor dies when creator process dies */
    NON_DETACHED,
    
    /** Actor persists beyond creator process lifetime */
    DETACHED
}

Actor Creation

Create stateful distributed workers that persist across multiple method calls.

// Actor creation methods (0-6 parameters)
public static <A> ActorCreator<A> actor(RayFunc0<A> f);
public static <T0, A> ActorCreator<A> actor(RayFunc1<T0, A> f, T0 t0);
public static <T0, T1, A> ActorCreator<A> actor(RayFunc2<T0, T1, A> f, T0 t0, T1 t1);
public static <T0, T1, T2, A> ActorCreator<A> actor(RayFunc3<T0, T1, T2, A> f, T0 t0, T1 t1, T2 t2);
public static <T0, T1, T2, T3, A> ActorCreator<A> actor(RayFunc4<T0, T1, T2, T3, A> f, T0 t0, T1 t1, T2 t2, T3 t3);
public static <T0, T1, T2, T3, T4, A> ActorCreator<A> actor(RayFunc5<T0, T1, T2, T3, T4, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
public static <T0, T1, T2, T3, T4, T5, A> ActorCreator<A> actor(RayFunc6<T0, T1, T2, T3, T4, T5, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);

public interface ActorCreator<A> {
    /**
     * Create the actor remotely.
     * @return ActorHandle for calling actor methods
     */
    ActorHandle<A> remote();
    
    /**
     * Set JVM options for the actor.
     * @param jvmOptions List of JVM options
     * @return ActorCreator for method chaining
     */
    ActorCreator<A> setJvmOptions(List<String> jvmOptions);
    
    /**
     * Set concurrency groups for the actor.
     * @param concurrencyGroups Concurrency group configuration
     * @return ActorCreator for method chaining
     */
    ActorCreator<A> setConcurrencyGroups(ConcurrencyGroup... concurrencyGroups);
    
    /**
     * Set runtime environment for the actor.
     * @param runtimeEnv Runtime environment configuration
     * @return ActorCreator for method chaining
     */
    ActorCreator<A> setRuntimeEnv(RuntimeEnv runtimeEnv);
}

Usage Examples:

public class Counter {
    private int count = 0;
    
    public Counter(int initialValue) {
        this.count = initialValue;
    }
    
    public int increment() {
        return ++count;
    }
    
    public int getValue() {
        return count;
    }
    
    public void reset() {
        count = 0;
    }
}

public class ActorExample {
    public static void main(String[] args) {
        Ray.init();
        
        // Create actor with constructor parameter
        ActorHandle<Counter> counter = Ray.actor(Counter::new, 10).remote();
        
        // Create actor with configuration
        ActorHandle<Counter> configuredCounter = Ray.actor(Counter::new, 0)
            .setJvmOptions(Arrays.asList("-Xmx1g", "-Xms512m"))
            .setMaxConcurrency(4)
            .remote();
        
        // Create named actor with full configuration using ActorCreationOptions
        ActorCreationOptions options = ActorCreationOptions.builder()
            .setName("global-counter")
            .setLifetime(ActorLifetime.DETACHED)
            .setResources(Map.of("CPU", 2.0, "memory", 1000.0))
            .setMaxRestarts(5)
            .setJvmOptions(Arrays.asList("-Xmx2g"))
            .setMaxConcurrency(8)
            .build();
            
        // Note: Full ActorCreationOptions integration would require additional API
        // This demonstrates the options builder pattern available
        
        Ray.shutdown();
    }
}

Actor Method Calls

Call methods on remote actors with full type safety and ObjectRef support.

public interface ActorHandle<A> extends BaseActorHandle {
    /**
     * Call actor method with return value.
     * Available for all method signatures (0-6 parameters).
     */
    <R> ActorTaskCaller<R> task(/* method reference and parameters */);
}

public interface ActorTaskCaller<R> {
    /**
     * Execute the actor method call remotely.
     * @return ObjectRef to the method result
     */
    ObjectRef<R> remote();
}

Usage Examples:

public class ActorMethodCalls {
    public static void main(String[] args) {
        Ray.init();
        
        // Create counter actor
        ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();
        
        // Call methods and get results
        ObjectRef<Integer> result1 = counter.task(Counter::increment).remote();
        ObjectRef<Integer> result2 = counter.task(Counter::increment).remote();
        ObjectRef<Integer> current = counter.task(Counter::getValue).remote();
        
        // Methods execute in order on the same actor instance
        System.out.println(Ray.get(result1)); // 1
        System.out.println(Ray.get(result2)); // 2
        System.out.println(Ray.get(current)); // 2
        
        // Void method calls
        ObjectRef<Void> resetResult = counter.task(Counter::reset).remote();
        Ray.get(resetResult); // Wait for completion
        
        ObjectRef<Integer> afterReset = counter.task(Counter::getValue).remote();
        System.out.println(Ray.get(afterReset)); // 0
        
        Ray.shutdown();
    }
}

Actor Handles

Manage actor lifecycle and get actor information.

public interface BaseActorHandle {
    /**
     * Get the actor ID.
     * @return ActorId of this actor
     */
    ActorId getId();
    
    /**
     * Kill the actor (allows restart).
     */
    void kill();
    
    /**
     * Kill the actor with restart control.
     * @param noRestart If true, prevent actor restart
     */
    void kill(boolean noRestart);
}

public interface ActorHandle<A> extends BaseActorHandle {
    // Inherits from BaseActorHandle
    // Plus type-safe method calling capability
}

Usage Examples:

public class ActorLifecycle {
    public static void main(String[] args) {
        Ray.init();
        
        // Create actor
        ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();
        
        // Get actor information
        ActorId actorId = counter.getId();
        System.out.println("Actor ID: " + actorId);
        
        // Use actor
        ObjectRef<Integer> result = counter.task(Counter::increment).remote();
        System.out.println("Result: " + Ray.get(result));
        
        // Kill actor (allows restart)
        counter.kill();
        
        // Kill actor permanently
        // counter.kill(true);
        
        Ray.shutdown();
    }
}

Named Actors

Create and retrieve actors by name for service discovery.

/**
 * Get a handle to a named actor in current namespace.
 * @param name The name of the named actor
 * @return Optional ActorHandle if actor exists
 * @throws RayException if timed out
 */
public static <T extends BaseActorHandle> Optional<T> getActor(String name);

/**
 * Get a handle to a named actor in specified namespace.
 * @param name The name of the named actor
 * @param namespace The namespace of the actor
 * @return Optional ActorHandle if actor exists
 * @throws RayException if timed out
 */
public static <T extends BaseActorHandle> Optional<T> getActor(String name, String namespace);

Usage Examples:

public class NamedActors {
    public static void main(String[] args) {
        Ray.init();
        
        // Create named actor (requires ActorCreationOptions)
        ActorCreationOptions options = ActorCreationOptions.builder()
            .setName("global-counter")
            .setLifetime(ActorLifetime.DETACHED)
            .build();
            
        // Note: Named actor creation requires using options
        // ActorHandle<Counter> counter = Ray.actor(Counter::new, 0)
        //     .setOptions(options)
        //     .remote();
        
        // Get named actor from anywhere in the cluster
        Optional<ActorHandle<Counter>> maybeCounter = Ray.getActor("global-counter");
        
        if (maybeCounter.isPresent()) {
            ActorHandle<Counter> counter = maybeCounter.get();
            ObjectRef<Integer> result = counter.task(Counter::increment).remote();
            System.out.println("Global counter: " + Ray.get(result));
        } else {
            System.out.println("Named actor not found");
        }
        
        // Get from specific namespace
        Optional<ActorHandle<Counter>> nsCounter = Ray.getActor("counter", "production");
        
        Ray.shutdown();
    }
}

Actor Exit

Gracefully exit from within an actor.

/**
 * Intentionally exit the current actor.
 * Must be called from within an actor method.
 * @throws RuntimeException if not called from within an actor
 */
public static void exitActor();

Usage Example:

public class GracefulActor {
    private boolean shouldStop = false;
    
    public void processWork(String work) {
        // Process work...
        System.out.println("Processing: " + work);
        
        if (shouldStop) {
            // Clean shutdown
            System.out.println("Actor shutting down gracefully");
            Ray.exitActor();
        }
    }
    
    public void shutdown() {
        shouldStop = true;
    }
}

public class ActorExit {
    public static void main(String[] args) {
        Ray.init();
        
        ActorHandle<GracefulActor> actor = Ray.actor(GracefulActor::new).remote();
        
        // Process some work
        Ray.get(actor.task(GracefulActor::processWork, "task1").remote());
        Ray.get(actor.task(GracefulActor::processWork, "task2").remote());
        
        // Signal shutdown
        Ray.get(actor.task(GracefulActor::shutdown).remote());
        
        // This will cause the actor to exit
        Ray.get(actor.task(GracefulActor::processWork, "final-task").remote());
        
        Ray.shutdown();
    }
}

Advanced Actor Patterns

Stateful Services

public class DatabaseService {
    private Map<String, String> data = new HashMap<>();
    
    public void put(String key, String value) {
        data.put(key, value);
    }
    
    public String get(String key) {
        return data.get(key);
    }
    
    public Set<String> keys() {
        return new HashSet<>(data.keySet());
    }
    
    public int size() {
        return data.size();
    }
}

public class StatefulService {
    public static void main(String[] args) {
        Ray.init();
        
        // Create stateful database service
        ActorHandle<DatabaseService> db = Ray.actor(DatabaseService::new).remote();
        
        // Store data
        Ray.get(db.task(DatabaseService::put, "user1", "Alice").remote());
        Ray.get(db.task(DatabaseService::put, "user2", "Bob").remote());
        
        // Query data
        ObjectRef<String> user1 = db.task(DatabaseService::get, "user1").remote();
        ObjectRef<Integer> count = db.task(DatabaseService::size).remote();
        
        System.out.println("User1: " + Ray.get(user1)); // "Alice"
        System.out.println("Count: " + Ray.get(count)); // 2
        
        Ray.shutdown();
    }
}

Actor Pools

public class Worker {
    private final int workerId;
    
    public Worker(int id) {
        this.workerId = id;
    }
    
    public String process(String task) {
        // Simulate work
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Worker " + workerId + " processed: " + task;
    }
}

public class ActorPool {
    public static void main(String[] args) {
        Ray.init();
        
        // Create pool of worker actors
        List<ActorHandle<Worker>> workers = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            workers.add(Ray.actor(Worker::new, i).remote());
        }
        
        // Distribute work across pool
        List<ObjectRef<String>> results = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            ActorHandle<Worker> worker = workers.get(i % workers.size());
            ObjectRef<String> result = worker.task(Worker::process, "task-" + i).remote();
            results.add(result);
        }
        
        // Wait for all results
        List<String> allResults = Ray.get(results);
        allResults.forEach(System.out::println);
        
        Ray.shutdown();
    }
}

Error Handling and Recovery

public class ResilientActor {
    private int processedCount = 0;
    
    public String processTask(String task) {
        processedCount++;
        
        // Simulate occasional failures
        if (task.contains("error")) {
            throw new RuntimeException("Task processing failed: " + task);
        }
        
        return "Processed " + task + " (total: " + processedCount + ")";
    }
    
    public int getProcessedCount() {
        return processedCount;
    }
}

public class ActorErrorHandling {
    public static void main(String[] args) {
        Ray.init();
        
        ActorHandle<ResilientActor> actor = Ray.actor(ResilientActor::new).remote();
        
        // Process successful tasks
        ObjectRef<String> result1 = actor.task(ResilientActor::processTask, "task1").remote();
        System.out.println(Ray.get(result1));
        
        // Process failing task
        try {
            ObjectRef<String> result2 = actor.task(ResilientActor::processTask, "error-task").remote();
            Ray.get(result2);
        } catch (RayTaskException e) {
            System.out.println("Task failed as expected: " + e.getMessage());
        }
        
        // Actor state is preserved despite the error
        ObjectRef<Integer> count = actor.task(ResilientActor::getProcessedCount).remote();
        System.out.println("Tasks processed: " + Ray.get(count)); // 2 (including the failed one)
        
        Ray.shutdown();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ray--ray-runtime

docs

actors.md

advanced-actors.md

cross-language.md

index.md

object-store.md

placement-groups.md

runtime.md

tasks.md

tile.json