0
# Advanced Actor Features
1
2
Specialized actor patterns including parallel actors and concurrency group management for high-performance distributed computing scenarios.
3
4
## Capabilities
5
6
### Parallel Actors
7
8
Parallel actors enable scaling a single logical actor across multiple processes for increased throughput.
9
10
```java { .api }
11
/**
12
* Base class for parallel actor implementations.
13
* Extend this class to create parallel actors.
14
*/
15
public class ParallelActor {
16
// Base functionality for parallel actor implementations
17
}
18
19
/**
20
* Handle for managing parallel actor instances.
21
*/
22
public interface ParallelActorHandle {
23
/**
24
* Get the number of parallel instances.
25
* @return Number of parallel actor instances
26
*/
27
int getNumInstances();
28
29
/**
30
* Get a specific parallel actor instance.
31
* @param index Instance index
32
* @return ParallelActorInstance handle
33
*/
34
ParallelActorInstance getInstance(int index);
35
36
/**
37
* Kill all parallel actor instances.
38
*/
39
void kill();
40
}
41
42
/**
43
* Individual instance within a parallel actor.
44
*/
45
public interface ParallelActorInstance {
46
/**
47
* Get the instance index.
48
* @return Index of this instance
49
*/
50
int getIndex();
51
52
/**
53
* Call method on this specific instance.
54
*/
55
// Method calling capability similar to regular actors
56
}
57
58
/**
59
* Context information for parallel actors.
60
*/
61
public interface ParallelActorContext {
62
/**
63
* Get current instance index.
64
* @return Index of current parallel actor instance
65
*/
66
int getCurrentInstanceIndex();
67
68
/**
69
* Get total number of instances.
70
* @return Total parallel actor instances
71
*/
72
int getTotalInstances();
73
}
74
75
/**
76
* Creator for parallel actors.
77
*/
78
public class ParallelActorCreator {
79
/**
80
* Set number of parallel instances.
81
* @param numInstances Number of parallel instances to create
82
* @return ParallelActorCreator for method chaining
83
*/
84
public ParallelActorCreator setNumInstances(int numInstances);
85
86
/**
87
* Create the parallel actor remotely.
88
* @return ParallelActorHandle for managing instances
89
*/
90
public ParallelActorHandle remote();
91
}
92
```
93
94
**Usage Examples:**
95
96
```java
97
public class ParallelWorker extends ParallelActor {
98
private final String workerId;
99
private int processedTasks = 0;
100
101
public ParallelWorker(String baseId) {
102
// Access parallel actor context
103
ParallelActorContext context = getParallelActorContext(); // Hypothetical method
104
this.workerId = baseId + "-" + context.getCurrentInstanceIndex();
105
}
106
107
public String processTask(String task) {
108
processedTasks++;
109
110
// Simulate processing time
111
try {
112
Thread.sleep(100);
113
} catch (InterruptedException e) {
114
Thread.currentThread().interrupt();
115
}
116
117
return workerId + " processed: " + task + " (total: " + processedTasks + ")";
118
}
119
120
public int getProcessedCount() {
121
return processedTasks;
122
}
123
}
124
125
public class ParallelActorExample {
126
public static void main(String[] args) {
127
Ray.init();
128
129
// Create parallel actor with 4 instances
130
ParallelActorCreator creator = new ParallelActorCreator();
131
ParallelActorHandle parallelWorker = creator
132
.setNumInstances(4)
133
.remote();
134
135
// Distribute work across parallel instances
136
List<ObjectRef<String>> results = new ArrayList<>();
137
138
for (int i = 0; i < 20; i++) {
139
// Ray automatically load-balances across instances
140
ParallelActorTaskCaller<String> taskCaller =
141
parallelWorker.task(ParallelWorker::processTask, "task-" + i);
142
results.add(taskCaller.remote());
143
}
144
145
// Wait for all results
146
List<String> allResults = Ray.get(results);
147
allResults.forEach(System.out::println);
148
149
// Access specific instances
150
for (int i = 0; i < parallelWorker.getNumInstances(); i++) {
151
ParallelActorInstance instance = parallelWorker.getInstance(i);
152
// Call methods on specific instance...
153
}
154
155
// Clean up
156
parallelWorker.kill();
157
158
Ray.shutdown();
159
}
160
}
161
```
162
163
### Parallel Actor Task Calling
164
165
Type-safe method calling for parallel actors with automatic load balancing.
166
167
```java { .api }
168
/**
169
* Task caller for parallel actor methods with return values.
170
*/
171
public class ParallelActorTaskCaller<R> {
172
/**
173
* Execute the method call on an available parallel actor instance.
174
* @return ObjectRef to the method result
175
*/
176
public ObjectRef<R> remote();
177
}
178
179
/**
180
* Task caller for void parallel actor methods.
181
*/
182
public class VoidParallelActorTaskCaller {
183
/**
184
* Execute the void method call on an available parallel actor instance.
185
* @return ObjectRef<Void> for synchronization
186
*/
187
public ObjectRef<Void> remote();
188
}
189
```
190
191
**Usage Examples:**
192
193
```java
194
public class ParallelActorTasking {
195
public static void main(String[] args) {
196
Ray.init();
197
198
// Create parallel actor
199
ParallelActorHandle parallelWorker = createParallelWorker(8);
200
201
// High-throughput task processing
202
List<ObjectRef<String>> batchResults = new ArrayList<>();
203
204
for (int batch = 0; batch < 10; batch++) {
205
List<ObjectRef<String>> batchTasks = new ArrayList<>();
206
207
// Process batch in parallel
208
for (int i = 0; i < 100; i++) {
209
String taskData = "batch-" + batch + "-task-" + i;
210
ParallelActorTaskCaller<String> caller =
211
parallelWorker.task(ParallelWorker::processTask, taskData);
212
batchTasks.add(caller.remote());
213
}
214
215
// Wait for batch completion
216
List<String> batchComplete = Ray.get(batchTasks);
217
System.out.println("Batch " + batch + " completed: " + batchComplete.size() + " tasks");
218
}
219
220
Ray.shutdown();
221
}
222
223
private static ParallelActorHandle createParallelWorker(int instances) {
224
ParallelActorCreator creator = new ParallelActorCreator();
225
return creator.setNumInstances(instances).remote();
226
}
227
}
228
```
229
230
### Concurrency Groups
231
232
Manage method-level concurrency within actors for fine-grained performance control.
233
234
```java { .api }
235
/**
236
* Concurrency group interface for managing concurrent method execution.
237
*/
238
public interface ConcurrencyGroup {
239
/**
240
* Get the concurrency group name.
241
* @return Name of the concurrency group
242
*/
243
String getName();
244
245
/**
246
* Get the maximum concurrency level.
247
* @return Maximum concurrent method executions
248
*/
249
int getMaxConcurrency();
250
}
251
252
/**
253
* Builder for creating concurrency groups.
254
*/
255
public class ConcurrencyGroupBuilder {
256
/**
257
* Set the concurrency group name.
258
* @param name Group name
259
* @return Builder for method chaining
260
*/
261
public ConcurrencyGroupBuilder setName(String name);
262
263
/**
264
* Set maximum concurrency level.
265
* @param maxConcurrency Maximum concurrent executions
266
* @return Builder for method chaining
267
*/
268
public ConcurrencyGroupBuilder setMaxConcurrency(int maxConcurrency);
269
270
/**
271
* Build the concurrency group.
272
* @return ConcurrencyGroup instance
273
*/
274
public ConcurrencyGroup build();
275
}
276
277
/**
278
* Base builder class for concurrency groups.
279
*/
280
public class BaseConcurrencyGroupBuilder {
281
// Base functionality for concurrency group builders
282
}
283
```
284
285
**Usage Examples:**
286
287
```java
288
public class ConcurrencyGroupExample {
289
public static void main(String[] args) {
290
Ray.init();
291
292
// Create concurrency groups
293
ConcurrencyGroup ioGroup = new ConcurrencyGroupBuilder()
294
.setName("io-operations")
295
.setMaxConcurrency(10)
296
.build();
297
298
ConcurrencyGroup computeGroup = new ConcurrencyGroupBuilder()
299
.setName("compute-operations")
300
.setMaxConcurrency(2)
301
.build();
302
303
// Create actor with concurrency groups
304
ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new)
305
.setConcurrencyGroups(ioGroup, computeGroup)
306
.remote();
307
308
// Methods will be executed according to their concurrency group limits
309
310
Ray.shutdown();
311
}
312
}
313
```
314
315
### Concurrency Group Annotations
316
317
Use annotations to specify concurrency groups at the method level.
318
319
```java { .api }
320
/**
321
* Annotation to define a concurrency group.
322
*/
323
@interface DefConcurrencyGroup {
324
/**
325
* Concurrency group name.
326
* @return Group name
327
*/
328
String name();
329
330
/**
331
* Maximum concurrency level.
332
* @return Maximum concurrent executions
333
*/
334
int maxConcurrency();
335
}
336
337
/**
338
* Annotation to define multiple concurrency groups.
339
*/
340
@interface DefConcurrencyGroups {
341
/**
342
* Array of concurrency group definitions.
343
* @return Array of DefConcurrencyGroup annotations
344
*/
345
DefConcurrencyGroup[] value();
346
}
347
348
/**
349
* Annotation to specify which concurrency group a method uses.
350
*/
351
@interface UseConcurrencyGroup {
352
/**
353
* Concurrency group name to use.
354
* @return Group name
355
*/
356
String value();
357
}
358
```
359
360
**Usage Examples:**
361
362
```java
363
@DefConcurrencyGroups({
364
@DefConcurrencyGroup(name = "io", maxConcurrency = 10),
365
@DefConcurrencyGroup(name = "compute", maxConcurrency = 2),
366
@DefConcurrencyGroup(name = "db", maxConcurrency = 5)
367
})
368
public class ConcurrentActor {
369
370
@UseConcurrencyGroup("io")
371
public String readFile(String filename) {
372
// I/O intensive operation - can run up to 10 concurrently
373
try {
374
Thread.sleep(1000); // Simulate file read
375
} catch (InterruptedException e) {
376
Thread.currentThread().interrupt();
377
}
378
return "Read: " + filename;
379
}
380
381
@UseConcurrencyGroup("compute")
382
public double computeHeavy(double input) {
383
// CPU intensive operation - limited to 2 concurrent executions
384
double result = input;
385
for (int i = 0; i < 1000000; i++) {
386
result = Math.sqrt(result + 1);
387
}
388
return result;
389
}
390
391
@UseConcurrencyGroup("db")
392
public String queryDatabase(String query) {
393
// Database operation - up to 5 concurrent queries
394
try {
395
Thread.sleep(500); // Simulate DB query
396
} catch (InterruptedException e) {
397
Thread.currentThread().interrupt();
398
}
399
return "Query result: " + query;
400
}
401
402
// Methods without annotation use default concurrency (1)
403
public String defaultMethod(String input) {
404
return "Default: " + input;
405
}
406
}
407
408
public class ConcurrencyAnnotationExample {
409
public static void main(String[] args) {
410
Ray.init();
411
412
// Create actor - concurrency groups are automatically configured from annotations
413
ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new).remote();
414
415
// Launch many I/O operations (up to 10 concurrent)
416
List<ObjectRef<String>> ioResults = new ArrayList<>();
417
for (int i = 0; i < 20; i++) {
418
ObjectRef<String> result = actor.task(ConcurrentActor::readFile, "file-" + i).remote();
419
ioResults.add(result);
420
}
421
422
// Launch compute operations (limited to 2 concurrent)
423
List<ObjectRef<Double>> computeResults = new ArrayList<>();
424
for (int i = 0; i < 10; i++) {
425
ObjectRef<Double> result = actor.task(ConcurrentActor::computeHeavy, i * 1.5).remote();
426
computeResults.add(result);
427
}
428
429
// Launch database operations (up to 5 concurrent)
430
List<ObjectRef<String>> dbResults = new ArrayList<>();
431
for (int i = 0; i < 15; i++) {
432
ObjectRef<String> result = actor.task(ConcurrentActor::queryDatabase, "SELECT * FROM table" + i).remote();
433
dbResults.add(result);
434
}
435
436
// Wait for results
437
System.out.println("I/O operations completed: " + Ray.get(ioResults).size());
438
System.out.println("Compute operations completed: " + Ray.get(computeResults).size());
439
System.out.println("DB operations completed: " + Ray.get(dbResults).size());
440
441
Ray.shutdown();
442
}
443
}
444
```
445
446
## Advanced Call Framework
447
448
### Parallel Actor Call Interface
449
450
```java { .api }
451
/**
452
* Base call class for parallel actor operations.
453
*/
454
public class Call {
455
// Base functionality for actor method calls
456
}
457
458
/**
459
* Actor call interface for method invocation.
460
*/
461
public interface ActorCall {
462
/**
463
* Execute the actor method call.
464
* @return Result of the method call
465
*/
466
Object call();
467
}
468
```
469
470
## Performance Optimization Patterns
471
472
### High-Throughput Processing
473
474
```java
475
public class HighThroughputProcessor extends ParallelActor {
476
@DefConcurrencyGroup(name = "process", maxConcurrency = 4)
477
public class ConcurrentProcessor {
478
479
@UseConcurrencyGroup("process")
480
public String processItem(String item) {
481
// Process item with controlled concurrency
482
return "Processed: " + item;
483
}
484
}
485
486
public static void main(String[] args) {
487
Ray.init();
488
489
// Combine parallel actors with concurrency groups
490
ParallelActorHandle processor = new ParallelActorCreator()
491
.setNumInstances(4) // 4 parallel instances
492
.remote();
493
494
// Each instance can process 4 items concurrently
495
// Total concurrency: 4 instances × 4 concurrent methods = 16
496
497
List<ObjectRef<String>> results = new ArrayList<>();
498
for (int i = 0; i < 1000; i++) {
499
ParallelActorTaskCaller<String> caller =
500
processor.task(ConcurrentProcessor::processItem, "item-" + i);
501
results.add(caller.remote());
502
}
503
504
List<String> completed = Ray.get(results);
505
System.out.println("Processed " + completed.size() + " items");
506
507
Ray.shutdown();
508
}
509
}
510
```
511
512
### Load Balancing and Fault Tolerance
513
514
```java
515
public class ResilientParallelActor extends ParallelActor {
516
private int processedCount = 0;
517
518
public String processWithRetry(String data) {
519
try {
520
// Simulate occasional failures
521
if (Math.random() < 0.1) {
522
throw new RuntimeException("Simulated processing error");
523
}
524
525
processedCount++;
526
return "Processed: " + data + " (instance processed: " + processedCount + ")";
527
528
} catch (Exception e) {
529
// Log error but don't propagate - let Ray handle retry logic
530
System.err.println("Processing error: " + e.getMessage());
531
throw e;
532
}
533
}
534
535
public int getProcessedCount() {
536
return processedCount;
537
}
538
}
539
540
public class FaultTolerantProcessing {
541
public static void main(String[] args) {
542
Ray.init();
543
544
// Create parallel actor with multiple instances for fault tolerance
545
ParallelActorHandle processor = new ParallelActorCreator()
546
.setNumInstances(6)
547
.remote();
548
549
List<ObjectRef<String>> results = new ArrayList<>();
550
551
// Process many items - failures on one instance don't affect others
552
for (int i = 0; i < 500; i++) {
553
ParallelActorTaskCaller<String> caller =
554
processor.task(ResilientParallelActor::processWithRetry, "data-" + i);
555
results.add(caller.remote());
556
}
557
558
// Handle results with error handling
559
int successCount = 0;
560
int errorCount = 0;
561
562
for (ObjectRef<String> result : results) {
563
try {
564
String value = Ray.get(result);
565
successCount++;
566
} catch (Exception e) {
567
errorCount++;
568
System.err.println("Task failed: " + e.getMessage());
569
}
570
}
571
572
System.out.println("Success: " + successCount + ", Errors: " + errorCount);
573
574
// Check instance statistics
575
for (int i = 0; i < processor.getNumInstances(); i++) {
576
ParallelActorInstance instance = processor.getInstance(i);
577
// Get stats from each instance...
578
}
579
580
Ray.shutdown();
581
}
582
}
583
```
584
585
## Best Practices
586
587
### Choosing Between Actor Types
588
589
```java
590
// Regular actors: For stateful services, single-threaded processing
591
ActorHandle<DatabaseService> dbService = Ray.actor(DatabaseService::new).remote();
592
593
// Parallel actors: For high-throughput, stateless processing
594
ParallelActorHandle processor = new ParallelActorCreator()
595
.setNumInstances(8)
596
.remote();
597
598
// Concurrent actors: For I/O bound operations with controlled concurrency
599
@DefConcurrencyGroup(name = "io", maxConcurrency = 10)
600
ActorHandle<IOService> ioService = Ray.actor(IOService::new).remote();
601
```
602
603
### Resource Management
604
605
```java
606
// Configure resources for parallel actors
607
ParallelActorHandle resourceAwareProcessor = new ParallelActorCreator()
608
.setNumInstances(4)
609
.setResources(Map.of("CPU", 2.0, "memory", 1000.0)) // Per instance
610
.remote();
611
612
// Total resources: 4 instances × (2 CPU + 1GB memory) = 8 CPU + 4GB memory
613
```
614
615
### Monitoring and Debugging
616
617
```java
618
public class ActorMonitoring {
619
public static void main(String[] args) {
620
Ray.init();
621
622
ParallelActorHandle processor = createParallelProcessor();
623
624
// Monitor parallel actor instances
625
System.out.println("Parallel actor instances: " + processor.getNumInstances());
626
627
for (int i = 0; i < processor.getNumInstances(); i++) {
628
ParallelActorInstance instance = processor.getInstance(i);
629
System.out.println("Instance " + instance.getIndex() + " status: active");
630
}
631
632
// Get runtime context for detailed information
633
RuntimeContext context = Ray.getRuntimeContext();
634
List<ActorInfo> actorInfos = context.getAllActorInfo();
635
636
for (ActorInfo info : actorInfos) {
637
if (info.getClassName().contains("ParallelActor")) {
638
System.out.println("Parallel actor: " + info.getActorId() +
639
" State: " + info.getState() +
640
" Node: " + info.getNodeId());
641
}
642
}
643
644
Ray.shutdown();
645
}
646
}
647
```