Ray runtime implementation for Java - the core distributed runtime component of Ray framework for scaling AI and Python applications
—
Specialized actor patterns including parallel actors and concurrency group management for high-performance distributed computing scenarios.
Parallel actors enable scaling a single logical actor across multiple processes for increased throughput.
/**
* Base class for parallel actor implementations.
* Extend this class to create parallel actors.
*/
public class ParallelActor {
// Base functionality for parallel actor implementations
}
/**
* Handle for managing parallel actor instances.
*/
public interface ParallelActorHandle {
/**
* Get the number of parallel instances.
* @return Number of parallel actor instances
*/
int getNumInstances();
/**
* Get a specific parallel actor instance.
* @param index Instance index
* @return ParallelActorInstance handle
*/
ParallelActorInstance getInstance(int index);
/**
* Kill all parallel actor instances.
*/
void kill();
}
/**
* Individual instance within a parallel actor.
*/
public interface ParallelActorInstance {
/**
* Get the instance index.
* @return Index of this instance
*/
int getIndex();
/**
* Call method on this specific instance.
*/
// Method calling capability similar to regular actors
}
/**
* Context information for parallel actors.
*/
public interface ParallelActorContext {
/**
* Get current instance index.
* @return Index of current parallel actor instance
*/
int getCurrentInstanceIndex();
/**
* Get total number of instances.
* @return Total parallel actor instances
*/
int getTotalInstances();
}
/**
* Creator for parallel actors.
*/
public class ParallelActorCreator {
/**
* Set number of parallel instances.
* @param numInstances Number of parallel instances to create
* @return ParallelActorCreator for method chaining
*/
public ParallelActorCreator setNumInstances(int numInstances);
/**
* Create the parallel actor remotely.
* @return ParallelActorHandle for managing instances
*/
public ParallelActorHandle remote();
}Usage Examples:
public class ParallelWorker extends ParallelActor {
private final String workerId;
private int processedTasks = 0;
public ParallelWorker(String baseId) {
// Access parallel actor context
ParallelActorContext context = getParallelActorContext(); // Hypothetical method
this.workerId = baseId + "-" + context.getCurrentInstanceIndex();
}
public String processTask(String task) {
processedTasks++;
// Simulate processing time
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return workerId + " processed: " + task + " (total: " + processedTasks + ")";
}
public int getProcessedCount() {
return processedTasks;
}
}
public class ParallelActorExample {
public static void main(String[] args) {
Ray.init();
// Create parallel actor with 4 instances
ParallelActorCreator creator = new ParallelActorCreator();
ParallelActorHandle parallelWorker = creator
.setNumInstances(4)
.remote();
// Distribute work across parallel instances
List<ObjectRef<String>> results = new ArrayList<>();
for (int i = 0; i < 20; i++) {
// Ray automatically load-balances across instances
ParallelActorTaskCaller<String> taskCaller =
parallelWorker.task(ParallelWorker::processTask, "task-" + i);
results.add(taskCaller.remote());
}
// Wait for all results
List<String> allResults = Ray.get(results);
allResults.forEach(System.out::println);
// Access specific instances
for (int i = 0; i < parallelWorker.getNumInstances(); i++) {
ParallelActorInstance instance = parallelWorker.getInstance(i);
// Call methods on specific instance...
}
// Clean up
parallelWorker.kill();
Ray.shutdown();
}
}Type-safe method calling for parallel actors with automatic load balancing.
/**
* Task caller for parallel actor methods with return values.
*/
public class ParallelActorTaskCaller<R> {
/**
* Execute the method call on an available parallel actor instance.
* @return ObjectRef to the method result
*/
public ObjectRef<R> remote();
}
/**
* Task caller for void parallel actor methods.
*/
public class VoidParallelActorTaskCaller {
/**
* Execute the void method call on an available parallel actor instance.
* @return ObjectRef<Void> for synchronization
*/
public ObjectRef<Void> remote();
}Usage Examples:
public class ParallelActorTasking {
public static void main(String[] args) {
Ray.init();
// Create parallel actor
ParallelActorHandle parallelWorker = createParallelWorker(8);
// High-throughput task processing
List<ObjectRef<String>> batchResults = new ArrayList<>();
for (int batch = 0; batch < 10; batch++) {
List<ObjectRef<String>> batchTasks = new ArrayList<>();
// Process batch in parallel
for (int i = 0; i < 100; i++) {
String taskData = "batch-" + batch + "-task-" + i;
ParallelActorTaskCaller<String> caller =
parallelWorker.task(ParallelWorker::processTask, taskData);
batchTasks.add(caller.remote());
}
// Wait for batch completion
List<String> batchComplete = Ray.get(batchTasks);
System.out.println("Batch " + batch + " completed: " + batchComplete.size() + " tasks");
}
Ray.shutdown();
}
private static ParallelActorHandle createParallelWorker(int instances) {
ParallelActorCreator creator = new ParallelActorCreator();
return creator.setNumInstances(instances).remote();
}
}Manage method-level concurrency within actors for fine-grained performance control.
/**
* Concurrency group interface for managing concurrent method execution.
*/
public interface ConcurrencyGroup {
/**
* Get the concurrency group name.
* @return Name of the concurrency group
*/
String getName();
/**
* Get the maximum concurrency level.
* @return Maximum concurrent method executions
*/
int getMaxConcurrency();
}
/**
* Builder for creating concurrency groups.
*/
public class ConcurrencyGroupBuilder {
/**
* Set the concurrency group name.
* @param name Group name
* @return Builder for method chaining
*/
public ConcurrencyGroupBuilder setName(String name);
/**
* Set maximum concurrency level.
* @param maxConcurrency Maximum concurrent executions
* @return Builder for method chaining
*/
public ConcurrencyGroupBuilder setMaxConcurrency(int maxConcurrency);
/**
* Build the concurrency group.
* @return ConcurrencyGroup instance
*/
public ConcurrencyGroup build();
}
/**
* Base builder class for concurrency groups.
*/
public class BaseConcurrencyGroupBuilder {
// Base functionality for concurrency group builders
}Usage Examples:
public class ConcurrencyGroupExample {
public static void main(String[] args) {
Ray.init();
// Create concurrency groups
ConcurrencyGroup ioGroup = new ConcurrencyGroupBuilder()
.setName("io-operations")
.setMaxConcurrency(10)
.build();
ConcurrencyGroup computeGroup = new ConcurrencyGroupBuilder()
.setName("compute-operations")
.setMaxConcurrency(2)
.build();
// Create actor with concurrency groups
ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(ioGroup, computeGroup)
.remote();
// Methods will be executed according to their concurrency group limits
Ray.shutdown();
}
}Use annotations to specify concurrency groups at the method level.
/**
* Annotation to define a concurrency group.
*/
@interface DefConcurrencyGroup {
/**
* Concurrency group name.
* @return Group name
*/
String name();
/**
* Maximum concurrency level.
* @return Maximum concurrent executions
*/
int maxConcurrency();
}
/**
* Annotation to define multiple concurrency groups.
*/
@interface DefConcurrencyGroups {
/**
* Array of concurrency group definitions.
* @return Array of DefConcurrencyGroup annotations
*/
DefConcurrencyGroup[] value();
}
/**
* Annotation to specify which concurrency group a method uses.
*/
@interface UseConcurrencyGroup {
/**
* Concurrency group name to use.
* @return Group name
*/
String value();
}Usage Examples:
@DefConcurrencyGroups({
@DefConcurrencyGroup(name = "io", maxConcurrency = 10),
@DefConcurrencyGroup(name = "compute", maxConcurrency = 2),
@DefConcurrencyGroup(name = "db", maxConcurrency = 5)
})
public class ConcurrentActor {
@UseConcurrencyGroup("io")
public String readFile(String filename) {
// I/O intensive operation - can run up to 10 concurrently
try {
Thread.sleep(1000); // Simulate file read
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Read: " + filename;
}
@UseConcurrencyGroup("compute")
public double computeHeavy(double input) {
// CPU intensive operation - limited to 2 concurrent executions
double result = input;
for (int i = 0; i < 1000000; i++) {
result = Math.sqrt(result + 1);
}
return result;
}
@UseConcurrencyGroup("db")
public String queryDatabase(String query) {
// Database operation - up to 5 concurrent queries
try {
Thread.sleep(500); // Simulate DB query
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Query result: " + query;
}
// Methods without annotation use default concurrency (1)
public String defaultMethod(String input) {
return "Default: " + input;
}
}
public class ConcurrencyAnnotationExample {
public static void main(String[] args) {
Ray.init();
// Create actor - concurrency groups are automatically configured from annotations
ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new).remote();
// Launch many I/O operations (up to 10 concurrent)
List<ObjectRef<String>> ioResults = new ArrayList<>();
for (int i = 0; i < 20; i++) {
ObjectRef<String> result = actor.task(ConcurrentActor::readFile, "file-" + i).remote();
ioResults.add(result);
}
// Launch compute operations (limited to 2 concurrent)
List<ObjectRef<Double>> computeResults = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ObjectRef<Double> result = actor.task(ConcurrentActor::computeHeavy, i * 1.5).remote();
computeResults.add(result);
}
// Launch database operations (up to 5 concurrent)
List<ObjectRef<String>> dbResults = new ArrayList<>();
for (int i = 0; i < 15; i++) {
ObjectRef<String> result = actor.task(ConcurrentActor::queryDatabase, "SELECT * FROM table" + i).remote();
dbResults.add(result);
}
// Wait for results
System.out.println("I/O operations completed: " + Ray.get(ioResults).size());
System.out.println("Compute operations completed: " + Ray.get(computeResults).size());
System.out.println("DB operations completed: " + Ray.get(dbResults).size());
Ray.shutdown();
}
}/**
* Base call class for parallel actor operations.
*/
public class Call {
// Base functionality for actor method calls
}
/**
* Actor call interface for method invocation.
*/
public interface ActorCall {
/**
* Execute the actor method call.
* @return Result of the method call
*/
Object call();
}public class HighThroughputProcessor extends ParallelActor {
@DefConcurrencyGroup(name = "process", maxConcurrency = 4)
public class ConcurrentProcessor {
@UseConcurrencyGroup("process")
public String processItem(String item) {
// Process item with controlled concurrency
return "Processed: " + item;
}
}
public static void main(String[] args) {
Ray.init();
// Combine parallel actors with concurrency groups
ParallelActorHandle processor = new ParallelActorCreator()
.setNumInstances(4) // 4 parallel instances
.remote();
// Each instance can process 4 items concurrently
// Total concurrency: 4 instances × 4 concurrent methods = 16
List<ObjectRef<String>> results = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ParallelActorTaskCaller<String> caller =
processor.task(ConcurrentProcessor::processItem, "item-" + i);
results.add(caller.remote());
}
List<String> completed = Ray.get(results);
System.out.println("Processed " + completed.size() + " items");
Ray.shutdown();
}
}public class ResilientParallelActor extends ParallelActor {
private int processedCount = 0;
public String processWithRetry(String data) {
try {
// Simulate occasional failures
if (Math.random() < 0.1) {
throw new RuntimeException("Simulated processing error");
}
processedCount++;
return "Processed: " + data + " (instance processed: " + processedCount + ")";
} catch (Exception e) {
// Log error but don't propagate - let Ray handle retry logic
System.err.println("Processing error: " + e.getMessage());
throw e;
}
}
public int getProcessedCount() {
return processedCount;
}
}
public class FaultTolerantProcessing {
public static void main(String[] args) {
Ray.init();
// Create parallel actor with multiple instances for fault tolerance
ParallelActorHandle processor = new ParallelActorCreator()
.setNumInstances(6)
.remote();
List<ObjectRef<String>> results = new ArrayList<>();
// Process many items - failures on one instance don't affect others
for (int i = 0; i < 500; i++) {
ParallelActorTaskCaller<String> caller =
processor.task(ResilientParallelActor::processWithRetry, "data-" + i);
results.add(caller.remote());
}
// Handle results with error handling
int successCount = 0;
int errorCount = 0;
for (ObjectRef<String> result : results) {
try {
String value = Ray.get(result);
successCount++;
} catch (Exception e) {
errorCount++;
System.err.println("Task failed: " + e.getMessage());
}
}
System.out.println("Success: " + successCount + ", Errors: " + errorCount);
// Check instance statistics
for (int i = 0; i < processor.getNumInstances(); i++) {
ParallelActorInstance instance = processor.getInstance(i);
// Get stats from each instance...
}
Ray.shutdown();
}
}// Regular actors: For stateful services, single-threaded processing
ActorHandle<DatabaseService> dbService = Ray.actor(DatabaseService::new).remote();
// Parallel actors: For high-throughput, stateless processing
ParallelActorHandle processor = new ParallelActorCreator()
.setNumInstances(8)
.remote();
// Concurrent actors: For I/O bound operations with controlled concurrency
@DefConcurrencyGroup(name = "io", maxConcurrency = 10)
ActorHandle<IOService> ioService = Ray.actor(IOService::new).remote();// Configure resources for parallel actors
ParallelActorHandle resourceAwareProcessor = new ParallelActorCreator()
.setNumInstances(4)
.setResources(Map.of("CPU", 2.0, "memory", 1000.0)) // Per instance
.remote();
// Total resources: 4 instances × (2 CPU + 1GB memory) = 8 CPU + 4GB memorypublic class ActorMonitoring {
public static void main(String[] args) {
Ray.init();
ParallelActorHandle processor = createParallelProcessor();
// Monitor parallel actor instances
System.out.println("Parallel actor instances: " + processor.getNumInstances());
for (int i = 0; i < processor.getNumInstances(); i++) {
ParallelActorInstance instance = processor.getInstance(i);
System.out.println("Instance " + instance.getIndex() + " status: active");
}
// Get runtime context for detailed information
RuntimeContext context = Ray.getRuntimeContext();
List<ActorInfo> actorInfos = context.getAllActorInfo();
for (ActorInfo info : actorInfos) {
if (info.getClassName().contains("ParallelActor")) {
System.out.println("Parallel actor: " + info.getActorId() +
" State: " + info.getState() +
" Node: " + info.getNodeId());
}
}
Ray.shutdown();
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ray--ray-runtime