CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ray--ray-runtime

Ray runtime implementation for Java - the core distributed runtime component of Ray framework for scaling AI and Python applications

Pending
Overview
Eval results
Files

advanced-actors.mddocs/

Advanced Actor Features

Specialized actor patterns including parallel actors and concurrency group management for high-performance distributed computing scenarios.

Capabilities

Parallel Actors

Parallel actors enable scaling a single logical actor across multiple processes for increased throughput.

/**
 * Base class for parallel actor implementations.
 * Extend this class to create parallel actors.
 */
public class ParallelActor {
    // Base functionality for parallel actor implementations
}

/**
 * Handle for managing parallel actor instances.
 */
public interface ParallelActorHandle {
    /**
     * Get the number of parallel instances.
     * @return Number of parallel actor instances
     */
    int getNumInstances();
    
    /**
     * Get a specific parallel actor instance.
     * @param index Instance index
     * @return ParallelActorInstance handle
     */
    ParallelActorInstance getInstance(int index);
    
    /**
     * Kill all parallel actor instances.
     */
    void kill();
}

/**
 * Individual instance within a parallel actor.
 */
public interface ParallelActorInstance {
    /**
     * Get the instance index.
     * @return Index of this instance
     */
    int getIndex();
    
    /**
     * Call method on this specific instance.
     */
    // Method calling capability similar to regular actors
}

/**
 * Context information for parallel actors.
 */
public interface ParallelActorContext {
    /**
     * Get current instance index.
     * @return Index of current parallel actor instance
     */
    int getCurrentInstanceIndex();
    
    /**
     * Get total number of instances.
     * @return Total parallel actor instances
     */
    int getTotalInstances();
}

/**
 * Creator for parallel actors.
 */
public class ParallelActorCreator {
    /**
     * Set number of parallel instances.
     * @param numInstances Number of parallel instances to create
     * @return ParallelActorCreator for method chaining
     */
    public ParallelActorCreator setNumInstances(int numInstances);
    
    /**
     * Create the parallel actor remotely.
     * @return ParallelActorHandle for managing instances
     */
    public ParallelActorHandle remote();
}

Usage Examples:

public class ParallelWorker extends ParallelActor {
    private final String workerId;
    private int processedTasks = 0;
    
    public ParallelWorker(String baseId) {
        // Access parallel actor context
        ParallelActorContext context = getParallelActorContext(); // Hypothetical method
        this.workerId = baseId + "-" + context.getCurrentInstanceIndex();
    }
    
    public String processTask(String task) {
        processedTasks++;
        
        // Simulate processing time
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        return workerId + " processed: " + task + " (total: " + processedTasks + ")";
    }
    
    public int getProcessedCount() {
        return processedTasks;
    }
}

public class ParallelActorExample {
    public static void main(String[] args) {
        Ray.init();
        
        // Create parallel actor with 4 instances
        ParallelActorCreator creator = new ParallelActorCreator();
        ParallelActorHandle parallelWorker = creator
            .setNumInstances(4)
            .remote();
        
        // Distribute work across parallel instances
        List<ObjectRef<String>> results = new ArrayList<>();
        
        for (int i = 0; i < 20; i++) {
            // Ray automatically load-balances across instances
            ParallelActorTaskCaller<String> taskCaller = 
                parallelWorker.task(ParallelWorker::processTask, "task-" + i);
            results.add(taskCaller.remote());
        }
        
        // Wait for all results
        List<String> allResults = Ray.get(results);
        allResults.forEach(System.out::println);
        
        // Access specific instances
        for (int i = 0; i < parallelWorker.getNumInstances(); i++) {
            ParallelActorInstance instance = parallelWorker.getInstance(i);
            // Call methods on specific instance...
        }
        
        // Clean up
        parallelWorker.kill();
        
        Ray.shutdown();
    }
}

Parallel Actor Task Calling

Type-safe method calling for parallel actors with automatic load balancing.

/**
 * Task caller for parallel actor methods with return values.
 */
public class ParallelActorTaskCaller<R> {
    /**
     * Execute the method call on an available parallel actor instance.
     * @return ObjectRef to the method result
     */
    public ObjectRef<R> remote();
}

/**
 * Task caller for void parallel actor methods.
 */
public class VoidParallelActorTaskCaller {
    /**
     * Execute the void method call on an available parallel actor instance.
     * @return ObjectRef<Void> for synchronization
     */
    public ObjectRef<Void> remote();
}

Usage Examples:

public class ParallelActorTasking {
    public static void main(String[] args) {
        Ray.init();
        
        // Create parallel actor
        ParallelActorHandle parallelWorker = createParallelWorker(8);
        
        // High-throughput task processing
        List<ObjectRef<String>> batchResults = new ArrayList<>();
        
        for (int batch = 0; batch < 10; batch++) {
            List<ObjectRef<String>> batchTasks = new ArrayList<>();
            
            // Process batch in parallel
            for (int i = 0; i < 100; i++) {
                String taskData = "batch-" + batch + "-task-" + i;
                ParallelActorTaskCaller<String> caller = 
                    parallelWorker.task(ParallelWorker::processTask, taskData);
                batchTasks.add(caller.remote());
            }
            
            // Wait for batch completion
            List<String> batchComplete = Ray.get(batchTasks);
            System.out.println("Batch " + batch + " completed: " + batchComplete.size() + " tasks");
        }
        
        Ray.shutdown();
    }
    
    private static ParallelActorHandle createParallelWorker(int instances) {
        ParallelActorCreator creator = new ParallelActorCreator();
        return creator.setNumInstances(instances).remote();
    }
}

Concurrency Groups

Manage method-level concurrency within actors for fine-grained performance control.

/**
 * Concurrency group interface for managing concurrent method execution.
 */
public interface ConcurrencyGroup {
    /**
     * Get the concurrency group name.
     * @return Name of the concurrency group
     */
    String getName();
    
    /**
     * Get the maximum concurrency level.
     * @return Maximum concurrent method executions
     */
    int getMaxConcurrency();
}

/**
 * Builder for creating concurrency groups.
 */
public class ConcurrencyGroupBuilder {
    /**
     * Set the concurrency group name.
     * @param name Group name
     * @return Builder for method chaining
     */
    public ConcurrencyGroupBuilder setName(String name);
    
    /**
     * Set maximum concurrency level.
     * @param maxConcurrency Maximum concurrent executions
     * @return Builder for method chaining
     */
    public ConcurrencyGroupBuilder setMaxConcurrency(int maxConcurrency);
    
    /**
     * Build the concurrency group.
     * @return ConcurrencyGroup instance
     */
    public ConcurrencyGroup build();
}

/**
 * Base builder class for concurrency groups.
 */
public class BaseConcurrencyGroupBuilder {
    // Base functionality for concurrency group builders
}

Usage Examples:

public class ConcurrencyGroupExample {
    public static void main(String[] args) {
        Ray.init();
        
        // Create concurrency groups
        ConcurrencyGroup ioGroup = new ConcurrencyGroupBuilder()
            .setName("io-operations")
            .setMaxConcurrency(10)
            .build();
        
        ConcurrencyGroup computeGroup = new ConcurrencyGroupBuilder()
            .setName("compute-operations")
            .setMaxConcurrency(2)
            .build();
        
        // Create actor with concurrency groups
        ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new)
            .setConcurrencyGroups(ioGroup, computeGroup)
            .remote();
        
        // Methods will be executed according to their concurrency group limits
        
        Ray.shutdown();
    }
}

Concurrency Group Annotations

Use annotations to specify concurrency groups at the method level.

/**
 * Annotation to define a concurrency group.
 */
@interface DefConcurrencyGroup {
    /**
     * Concurrency group name.
     * @return Group name
     */
    String name();
    
    /**
     * Maximum concurrency level.
     * @return Maximum concurrent executions
     */
    int maxConcurrency();
}

/**
 * Annotation to define multiple concurrency groups.
 */
@interface DefConcurrencyGroups {
    /**
     * Array of concurrency group definitions.
     * @return Array of DefConcurrencyGroup annotations
     */
    DefConcurrencyGroup[] value();
}

/**
 * Annotation to specify which concurrency group a method uses.
 */
@interface UseConcurrencyGroup {
    /**
     * Concurrency group name to use.
     * @return Group name
     */
    String value();
}

Usage Examples:

@DefConcurrencyGroups({
    @DefConcurrencyGroup(name = "io", maxConcurrency = 10),
    @DefConcurrencyGroup(name = "compute", maxConcurrency = 2),
    @DefConcurrencyGroup(name = "db", maxConcurrency = 5)
})
public class ConcurrentActor {
    
    @UseConcurrencyGroup("io")
    public String readFile(String filename) {
        // I/O intensive operation - can run up to 10 concurrently
        try {
            Thread.sleep(1000); // Simulate file read
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Read: " + filename;
    }
    
    @UseConcurrencyGroup("compute")
    public double computeHeavy(double input) {
        // CPU intensive operation - limited to 2 concurrent executions
        double result = input;
        for (int i = 0; i < 1000000; i++) {
            result = Math.sqrt(result + 1);
        }
        return result;
    }
    
    @UseConcurrencyGroup("db")
    public String queryDatabase(String query) {
        // Database operation - up to 5 concurrent queries
        try {
            Thread.sleep(500); // Simulate DB query
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Query result: " + query;
    }
    
    // Methods without annotation use default concurrency (1)
    public String defaultMethod(String input) {
        return "Default: " + input;
    }
}

public class ConcurrencyAnnotationExample {
    public static void main(String[] args) {
        Ray.init();
        
        // Create actor - concurrency groups are automatically configured from annotations
        ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new).remote();
        
        // Launch many I/O operations (up to 10 concurrent)
        List<ObjectRef<String>> ioResults = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            ObjectRef<String> result = actor.task(ConcurrentActor::readFile, "file-" + i).remote();
            ioResults.add(result);
        }
        
        // Launch compute operations (limited to 2 concurrent)
        List<ObjectRef<Double>> computeResults = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            ObjectRef<Double> result = actor.task(ConcurrentActor::computeHeavy, i * 1.5).remote();
            computeResults.add(result);
        }
        
        // Launch database operations (up to 5 concurrent)
        List<ObjectRef<String>> dbResults = new ArrayList<>();
        for (int i = 0; i < 15; i++) {
            ObjectRef<String> result = actor.task(ConcurrentActor::queryDatabase, "SELECT * FROM table" + i).remote();
            dbResults.add(result);
        }
        
        // Wait for results
        System.out.println("I/O operations completed: " + Ray.get(ioResults).size());
        System.out.println("Compute operations completed: " + Ray.get(computeResults).size());
        System.out.println("DB operations completed: " + Ray.get(dbResults).size());
        
        Ray.shutdown();
    }
}

Advanced Call Framework

Parallel Actor Call Interface

/**
 * Base call class for parallel actor operations.  
 */
public class Call {
    // Base functionality for actor method calls
}

/**
 * Actor call interface for method invocation.
 */
public interface ActorCall {
    /**
     * Execute the actor method call.
     * @return Result of the method call
     */
    Object call();
}

Performance Optimization Patterns

High-Throughput Processing

public class HighThroughputProcessor extends ParallelActor {
    @DefConcurrencyGroup(name = "process", maxConcurrency = 4)
    public class ConcurrentProcessor {
        
        @UseConcurrencyGroup("process")
        public String processItem(String item) {
            // Process item with controlled concurrency
            return "Processed: " + item;
        }
    }
    
    public static void main(String[] args) {
        Ray.init();
        
        // Combine parallel actors with concurrency groups
        ParallelActorHandle processor = new ParallelActorCreator()
            .setNumInstances(4) // 4 parallel instances
            .remote();
        
        // Each instance can process 4 items concurrently
        // Total concurrency: 4 instances × 4 concurrent methods = 16
        
        List<ObjectRef<String>> results = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ParallelActorTaskCaller<String> caller = 
                processor.task(ConcurrentProcessor::processItem, "item-" + i);
            results.add(caller.remote());
        }
        
        List<String> completed = Ray.get(results);
        System.out.println("Processed " + completed.size() + " items");
        
        Ray.shutdown();
    }
}

Load Balancing and Fault Tolerance

public class ResilientParallelActor extends ParallelActor {
    private int processedCount = 0;
    
    public String processWithRetry(String data) {
        try {
            // Simulate occasional failures
            if (Math.random() < 0.1) {
                throw new RuntimeException("Simulated processing error");
            }
            
            processedCount++;
            return "Processed: " + data + " (instance processed: " + processedCount + ")";
            
        } catch (Exception e) {
            // Log error but don't propagate - let Ray handle retry logic
            System.err.println("Processing error: " + e.getMessage());
            throw e;
        }
    }
    
    public int getProcessedCount() {
        return processedCount;
    }
}

public class FaultTolerantProcessing {
    public static void main(String[] args) {
        Ray.init();
        
        // Create parallel actor with multiple instances for fault tolerance
        ParallelActorHandle processor = new ParallelActorCreator()
            .setNumInstances(6)
            .remote();
        
        List<ObjectRef<String>> results = new ArrayList<>();
        
        // Process many items - failures on one instance don't affect others
        for (int i = 0; i < 500; i++) {
            ParallelActorTaskCaller<String> caller = 
                processor.task(ResilientParallelActor::processWithRetry, "data-" + i);
            results.add(caller.remote());
        }
        
        // Handle results with error handling
        int successCount = 0;
        int errorCount = 0;
        
        for (ObjectRef<String> result : results) {
            try {
                String value = Ray.get(result);
                successCount++;
            } catch (Exception e) {
                errorCount++;
                System.err.println("Task failed: " + e.getMessage());
            }
        }
        
        System.out.println("Success: " + successCount + ", Errors: " + errorCount);
        
        // Check instance statistics
        for (int i = 0; i < processor.getNumInstances(); i++) {
            ParallelActorInstance instance = processor.getInstance(i);
            // Get stats from each instance...
        }
        
        Ray.shutdown();
    }
}

Best Practices

Choosing Between Actor Types

// Regular actors: For stateful services, single-threaded processing
ActorHandle<DatabaseService> dbService = Ray.actor(DatabaseService::new).remote();

// Parallel actors: For high-throughput, stateless processing
ParallelActorHandle processor = new ParallelActorCreator()
    .setNumInstances(8)
    .remote();

// Concurrent actors: For I/O bound operations with controlled concurrency
@DefConcurrencyGroup(name = "io", maxConcurrency = 10)
ActorHandle<IOService> ioService = Ray.actor(IOService::new).remote();

Resource Management

// Configure resources for parallel actors
ParallelActorHandle resourceAwareProcessor = new ParallelActorCreator()
    .setNumInstances(4)
    .setResources(Map.of("CPU", 2.0, "memory", 1000.0)) // Per instance
    .remote();

// Total resources: 4 instances × (2 CPU + 1GB memory) = 8 CPU + 4GB memory

Monitoring and Debugging

public class ActorMonitoring {
    public static void main(String[] args) {
        Ray.init();
        
        ParallelActorHandle processor = createParallelProcessor();
        
        // Monitor parallel actor instances
        System.out.println("Parallel actor instances: " + processor.getNumInstances());
        
        for (int i = 0; i < processor.getNumInstances(); i++) {
            ParallelActorInstance instance = processor.getInstance(i);
            System.out.println("Instance " + instance.getIndex() + " status: active");
        }
        
        // Get runtime context for detailed information
        RuntimeContext context = Ray.getRuntimeContext();
        List<ActorInfo> actorInfos = context.getAllActorInfo();
        
        for (ActorInfo info : actorInfos) {
            if (info.getClassName().contains("ParallelActor")) {
                System.out.println("Parallel actor: " + info.getActorId() + 
                                 " State: " + info.getState() + 
                                 " Node: " + info.getNodeId());
            }
        }
        
        Ray.shutdown();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ray--ray-runtime

docs

actors.md

advanced-actors.md

cross-language.md

index.md

object-store.md

placement-groups.md

runtime.md

tasks.md

tile.json