0
# Actor Programming
1
2
Stateful distributed workers with method-level task execution, lifecycle management, and resource control for building robust distributed applications.
3
4
## Capabilities
5
6
### Actor Creation Options
7
8
Configure actor creation with detailed options including naming, resources, and lifecycle settings.
9
10
```java { .api }
11
/**
12
* Options for configuring actor creation.
13
*/
14
public class ActorCreationOptions {
15
/** Constant for no actor restarts */
16
public static final int NO_RESTART = 0;
17
18
/** Constant for infinite actor restarts */
19
public static final int INFINITE_RESTART = -1;
20
21
/**
22
* Create builder for actor creation options.
23
* @return ActorCreationOptions.Builder
24
*/
25
public static Builder builder();
26
27
public static class Builder {
28
/**
29
* Set actor name for service discovery.
30
* @param name Actor name
31
* @return Builder for method chaining
32
*/
33
public Builder setName(String name);
34
35
/**
36
* Set actor lifetime policy.
37
* @param lifetime Actor lifetime setting
38
* @return Builder for method chaining
39
*/
40
public Builder setLifetime(ActorLifetime lifetime);
41
42
/**
43
* Set single resource requirement.
44
* @param resource Resource name
45
* @param quantity Resource quantity
46
* @return Builder for method chaining
47
*/
48
public Builder setResource(String resource, Double quantity);
49
50
/**
51
* Set multiple resource requirements.
52
* @param resources Map of resource names to quantities
53
* @return Builder for method chaining
54
*/
55
public Builder setResources(Map<String, Double> resources);
56
57
/**
58
* Set maximum number of actor restarts.
59
* @param maxRestarts Maximum restarts (use constants NO_RESTART or INFINITE_RESTART)
60
* @return Builder for method chaining
61
*/
62
public Builder setMaxRestarts(int maxRestarts);
63
64
/**
65
* Set maximum task retries per actor method call.
66
* @param maxTaskRetries Maximum task retries
67
* @return Builder for method chaining
68
*/
69
public Builder setMaxTaskRetries(int maxTaskRetries);
70
71
/**
72
* Set JVM options for the actor process.
73
* @param jvmOptions List of JVM options
74
* @return Builder for method chaining
75
*/
76
public Builder setJvmOptions(List<String> jvmOptions);
77
78
/**
79
* Set maximum concurrent method calls.
80
* @param maxConcurrency Maximum concurrent calls
81
* @return Builder for method chaining
82
*/
83
public Builder setMaxConcurrency(int maxConcurrency);
84
85
/**
86
* Set placement group and bundle index.
87
* @param placementGroup Placement group
88
* @param bundleIndex Bundle index within placement group
89
* @return Builder for method chaining
90
*/
91
public Builder setPlacementGroup(PlacementGroup placementGroup, int bundleIndex);
92
93
/**
94
* Set runtime environment.
95
* @param runtimeEnv Runtime environment configuration
96
* @return Builder for method chaining
97
*/
98
public Builder setRuntimeEnv(RuntimeEnv runtimeEnv);
99
100
/**
101
* Build the actor creation options.
102
* @return ActorCreationOptions instance
103
*/
104
public ActorCreationOptions build();
105
}
106
}
107
108
/**
109
* Actor lifetime enumeration.
110
*/
111
public enum ActorLifetime {
112
/** Actor dies when creator process dies */
113
NON_DETACHED,
114
115
/** Actor persists beyond creator process lifetime */
116
DETACHED
117
}
118
```
119
120
### Actor Creation
121
122
Create stateful distributed workers that persist across multiple method calls.
123
124
```java { .api }
125
// Actor creation methods (0-6 parameters)
126
public static <A> ActorCreator<A> actor(RayFunc0<A> f);
127
public static <T0, A> ActorCreator<A> actor(RayFunc1<T0, A> f, T0 t0);
128
public static <T0, T1, A> ActorCreator<A> actor(RayFunc2<T0, T1, A> f, T0 t0, T1 t1);
129
public static <T0, T1, T2, A> ActorCreator<A> actor(RayFunc3<T0, T1, T2, A> f, T0 t0, T1 t1, T2 t2);
130
public static <T0, T1, T2, T3, A> ActorCreator<A> actor(RayFunc4<T0, T1, T2, T3, A> f, T0 t0, T1 t1, T2 t2, T3 t3);
131
public static <T0, T1, T2, T3, T4, A> ActorCreator<A> actor(RayFunc5<T0, T1, T2, T3, T4, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);
132
public static <T0, T1, T2, T3, T4, T5, A> ActorCreator<A> actor(RayFunc6<T0, T1, T2, T3, T4, T5, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
133
134
public interface ActorCreator<A> {
135
/**
136
* Create the actor remotely.
137
* @return ActorHandle for calling actor methods
138
*/
139
ActorHandle<A> remote();
140
141
/**
142
* Set JVM options for the actor.
143
* @param jvmOptions List of JVM options
144
* @return ActorCreator for method chaining
145
*/
146
ActorCreator<A> setJvmOptions(List<String> jvmOptions);
147
148
/**
149
* Set concurrency groups for the actor.
150
* @param concurrencyGroups Concurrency group configuration
151
* @return ActorCreator for method chaining
152
*/
153
ActorCreator<A> setConcurrencyGroups(ConcurrencyGroup... concurrencyGroups);
154
155
/**
156
* Set runtime environment for the actor.
157
* @param runtimeEnv Runtime environment configuration
158
* @return ActorCreator for method chaining
159
*/
160
ActorCreator<A> setRuntimeEnv(RuntimeEnv runtimeEnv);
161
}
162
```
163
164
**Usage Examples:**
165
166
```java
167
public class Counter {
168
private int count = 0;
169
170
public Counter(int initialValue) {
171
this.count = initialValue;
172
}
173
174
public int increment() {
175
return ++count;
176
}
177
178
public int getValue() {
179
return count;
180
}
181
182
public void reset() {
183
count = 0;
184
}
185
}
186
187
public class ActorExample {
188
public static void main(String[] args) {
189
Ray.init();
190
191
// Create actor with constructor parameter
192
ActorHandle<Counter> counter = Ray.actor(Counter::new, 10).remote();
193
194
// Create actor with configuration
195
ActorHandle<Counter> configuredCounter = Ray.actor(Counter::new, 0)
196
.setJvmOptions(Arrays.asList("-Xmx1g", "-Xms512m"))
197
.setMaxConcurrency(4)
198
.remote();
199
200
// Create named actor with full configuration using ActorCreationOptions
201
ActorCreationOptions options = ActorCreationOptions.builder()
202
.setName("global-counter")
203
.setLifetime(ActorLifetime.DETACHED)
204
.setResources(Map.of("CPU", 2.0, "memory", 1000.0))
205
.setMaxRestarts(5)
206
.setJvmOptions(Arrays.asList("-Xmx2g"))
207
.setMaxConcurrency(8)
208
.build();
209
210
// Note: Full ActorCreationOptions integration would require additional API
211
// This demonstrates the options builder pattern available
212
213
Ray.shutdown();
214
}
215
}
216
```
217
218
### Actor Method Calls
219
220
Call methods on remote actors with full type safety and ObjectRef support.
221
222
```java { .api }
223
public interface ActorHandle<A> extends BaseActorHandle {
224
/**
225
* Call actor method with return value.
226
* Available for all method signatures (0-6 parameters).
227
*/
228
<R> ActorTaskCaller<R> task(/* method reference and parameters */);
229
}
230
231
public interface ActorTaskCaller<R> {
232
/**
233
* Execute the actor method call remotely.
234
* @return ObjectRef to the method result
235
*/
236
ObjectRef<R> remote();
237
}
238
```
239
240
**Usage Examples:**
241
242
```java
243
public class ActorMethodCalls {
244
public static void main(String[] args) {
245
Ray.init();
246
247
// Create counter actor
248
ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();
249
250
// Call methods and get results
251
ObjectRef<Integer> result1 = counter.task(Counter::increment).remote();
252
ObjectRef<Integer> result2 = counter.task(Counter::increment).remote();
253
ObjectRef<Integer> current = counter.task(Counter::getValue).remote();
254
255
// Methods execute in order on the same actor instance
256
System.out.println(Ray.get(result1)); // 1
257
System.out.println(Ray.get(result2)); // 2
258
System.out.println(Ray.get(current)); // 2
259
260
// Void method calls
261
ObjectRef<Void> resetResult = counter.task(Counter::reset).remote();
262
Ray.get(resetResult); // Wait for completion
263
264
ObjectRef<Integer> afterReset = counter.task(Counter::getValue).remote();
265
System.out.println(Ray.get(afterReset)); // 0
266
267
Ray.shutdown();
268
}
269
}
270
```
271
272
### Actor Handles
273
274
Manage actor lifecycle and get actor information.
275
276
```java { .api }
277
public interface BaseActorHandle {
278
/**
279
* Get the actor ID.
280
* @return ActorId of this actor
281
*/
282
ActorId getId();
283
284
/**
285
* Kill the actor (allows restart).
286
*/
287
void kill();
288
289
/**
290
* Kill the actor with restart control.
291
* @param noRestart If true, prevent actor restart
292
*/
293
void kill(boolean noRestart);
294
}
295
296
public interface ActorHandle<A> extends BaseActorHandle {
297
// Inherits from BaseActorHandle
298
// Plus type-safe method calling capability
299
}
300
```
301
302
**Usage Examples:**
303
304
```java
305
public class ActorLifecycle {
306
public static void main(String[] args) {
307
Ray.init();
308
309
// Create actor
310
ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();
311
312
// Get actor information
313
ActorId actorId = counter.getId();
314
System.out.println("Actor ID: " + actorId);
315
316
// Use actor
317
ObjectRef<Integer> result = counter.task(Counter::increment).remote();
318
System.out.println("Result: " + Ray.get(result));
319
320
// Kill actor (allows restart)
321
counter.kill();
322
323
// Kill actor permanently
324
// counter.kill(true);
325
326
Ray.shutdown();
327
}
328
}
329
```
330
331
### Named Actors
332
333
Create and retrieve actors by name for service discovery.
334
335
```java { .api }
336
/**
337
* Get a handle to a named actor in current namespace.
338
* @param name The name of the named actor
339
* @return Optional ActorHandle if actor exists
340
* @throws RayException if timed out
341
*/
342
public static <T extends BaseActorHandle> Optional<T> getActor(String name);
343
344
/**
345
* Get a handle to a named actor in specified namespace.
346
* @param name The name of the named actor
347
* @param namespace The namespace of the actor
348
* @return Optional ActorHandle if actor exists
349
* @throws RayException if timed out
350
*/
351
public static <T extends BaseActorHandle> Optional<T> getActor(String name, String namespace);
352
```
353
354
**Usage Examples:**
355
356
```java
357
public class NamedActors {
358
public static void main(String[] args) {
359
Ray.init();
360
361
// Create named actor (requires ActorCreationOptions)
362
ActorCreationOptions options = ActorCreationOptions.builder()
363
.setName("global-counter")
364
.setLifetime(ActorLifetime.DETACHED)
365
.build();
366
367
// Note: Named actor creation requires using options
368
// ActorHandle<Counter> counter = Ray.actor(Counter::new, 0)
369
// .setOptions(options)
370
// .remote();
371
372
// Get named actor from anywhere in the cluster
373
Optional<ActorHandle<Counter>> maybeCounter = Ray.getActor("global-counter");
374
375
if (maybeCounter.isPresent()) {
376
ActorHandle<Counter> counter = maybeCounter.get();
377
ObjectRef<Integer> result = counter.task(Counter::increment).remote();
378
System.out.println("Global counter: " + Ray.get(result));
379
} else {
380
System.out.println("Named actor not found");
381
}
382
383
// Get from specific namespace
384
Optional<ActorHandle<Counter>> nsCounter = Ray.getActor("counter", "production");
385
386
Ray.shutdown();
387
}
388
}
389
```
390
391
### Actor Exit
392
393
Gracefully exit from within an actor.
394
395
```java { .api }
396
/**
397
* Intentionally exit the current actor.
398
* Must be called from within an actor method.
399
* @throws RuntimeException if not called from within an actor
400
*/
401
public static void exitActor();
402
```
403
404
**Usage Example:**
405
406
```java
407
public class GracefulActor {
408
private boolean shouldStop = false;
409
410
public void processWork(String work) {
411
// Process work...
412
System.out.println("Processing: " + work);
413
414
if (shouldStop) {
415
// Clean shutdown
416
System.out.println("Actor shutting down gracefully");
417
Ray.exitActor();
418
}
419
}
420
421
public void shutdown() {
422
shouldStop = true;
423
}
424
}
425
426
public class ActorExit {
427
public static void main(String[] args) {
428
Ray.init();
429
430
ActorHandle<GracefulActor> actor = Ray.actor(GracefulActor::new).remote();
431
432
// Process some work
433
Ray.get(actor.task(GracefulActor::processWork, "task1").remote());
434
Ray.get(actor.task(GracefulActor::processWork, "task2").remote());
435
436
// Signal shutdown
437
Ray.get(actor.task(GracefulActor::shutdown).remote());
438
439
// This will cause the actor to exit
440
Ray.get(actor.task(GracefulActor::processWork, "final-task").remote());
441
442
Ray.shutdown();
443
}
444
}
445
```
446
447
## Advanced Actor Patterns
448
449
### Stateful Services
450
451
```java
452
public class DatabaseService {
453
private Map<String, String> data = new HashMap<>();
454
455
public void put(String key, String value) {
456
data.put(key, value);
457
}
458
459
public String get(String key) {
460
return data.get(key);
461
}
462
463
public Set<String> keys() {
464
return new HashSet<>(data.keySet());
465
}
466
467
public int size() {
468
return data.size();
469
}
470
}
471
472
public class StatefulService {
473
public static void main(String[] args) {
474
Ray.init();
475
476
// Create stateful database service
477
ActorHandle<DatabaseService> db = Ray.actor(DatabaseService::new).remote();
478
479
// Store data
480
Ray.get(db.task(DatabaseService::put, "user1", "Alice").remote());
481
Ray.get(db.task(DatabaseService::put, "user2", "Bob").remote());
482
483
// Query data
484
ObjectRef<String> user1 = db.task(DatabaseService::get, "user1").remote();
485
ObjectRef<Integer> count = db.task(DatabaseService::size).remote();
486
487
System.out.println("User1: " + Ray.get(user1)); // "Alice"
488
System.out.println("Count: " + Ray.get(count)); // 2
489
490
Ray.shutdown();
491
}
492
}
493
```
494
495
### Actor Pools
496
497
```java
498
public class Worker {
499
private final int workerId;
500
501
public Worker(int id) {
502
this.workerId = id;
503
}
504
505
public String process(String task) {
506
// Simulate work
507
try {
508
Thread.sleep(1000);
509
} catch (InterruptedException e) {
510
Thread.currentThread().interrupt();
511
}
512
return "Worker " + workerId + " processed: " + task;
513
}
514
}
515
516
public class ActorPool {
517
public static void main(String[] args) {
518
Ray.init();
519
520
// Create pool of worker actors
521
List<ActorHandle<Worker>> workers = new ArrayList<>();
522
for (int i = 0; i < 5; i++) {
523
workers.add(Ray.actor(Worker::new, i).remote());
524
}
525
526
// Distribute work across pool
527
List<ObjectRef<String>> results = new ArrayList<>();
528
for (int i = 0; i < 20; i++) {
529
ActorHandle<Worker> worker = workers.get(i % workers.size());
530
ObjectRef<String> result = worker.task(Worker::process, "task-" + i).remote();
531
results.add(result);
532
}
533
534
// Wait for all results
535
List<String> allResults = Ray.get(results);
536
allResults.forEach(System.out::println);
537
538
Ray.shutdown();
539
}
540
}
541
```
542
543
### Error Handling and Recovery
544
545
```java
546
public class ResilientActor {
547
private int processedCount = 0;
548
549
public String processTask(String task) {
550
processedCount++;
551
552
// Simulate occasional failures
553
if (task.contains("error")) {
554
throw new RuntimeException("Task processing failed: " + task);
555
}
556
557
return "Processed " + task + " (total: " + processedCount + ")";
558
}
559
560
public int getProcessedCount() {
561
return processedCount;
562
}
563
}
564
565
public class ActorErrorHandling {
566
public static void main(String[] args) {
567
Ray.init();
568
569
ActorHandle<ResilientActor> actor = Ray.actor(ResilientActor::new).remote();
570
571
// Process successful tasks
572
ObjectRef<String> result1 = actor.task(ResilientActor::processTask, "task1").remote();
573
System.out.println(Ray.get(result1));
574
575
// Process failing task
576
try {
577
ObjectRef<String> result2 = actor.task(ResilientActor::processTask, "error-task").remote();
578
Ray.get(result2);
579
} catch (RayTaskException e) {
580
System.out.println("Task failed as expected: " + e.getMessage());
581
}
582
583
// Actor state is preserved despite the error
584
ObjectRef<Integer> count = actor.task(ResilientActor::getProcessedCount).remote();
585
System.out.println("Tasks processed: " + Ray.get(count)); // 2 (including the failed one)
586
587
Ray.shutdown();
588
}
589
}
590
```