or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdadvanced-actors.mdcross-language.mdindex.mdobject-store.mdplacement-groups.mdruntime.mdtasks.md

actors.mddocs/

0

# Actor Programming

1

2

Stateful distributed workers with method-level task execution, lifecycle management, and resource control for building robust distributed applications.

3

4

## Capabilities

5

6

### Actor Creation Options

7

8

Configure actor creation with detailed options including naming, resources, and lifecycle settings.

9

10

```java { .api }

11

/**

12

* Options for configuring actor creation.

13

*/

14

public class ActorCreationOptions {

15

/** Constant for no actor restarts */

16

public static final int NO_RESTART = 0;

17

18

/** Constant for infinite actor restarts */

19

public static final int INFINITE_RESTART = -1;

20

21

/**

22

* Create builder for actor creation options.

23

* @return ActorCreationOptions.Builder

24

*/

25

public static Builder builder();

26

27

public static class Builder {

28

/**

29

* Set actor name for service discovery.

30

* @param name Actor name

31

* @return Builder for method chaining

32

*/

33

public Builder setName(String name);

34

35

/**

36

* Set actor lifetime policy.

37

* @param lifetime Actor lifetime setting

38

* @return Builder for method chaining

39

*/

40

public Builder setLifetime(ActorLifetime lifetime);

41

42

/**

43

* Set single resource requirement.

44

* @param resource Resource name

45

* @param quantity Resource quantity

46

* @return Builder for method chaining

47

*/

48

public Builder setResource(String resource, Double quantity);

49

50

/**

51

* Set multiple resource requirements.

52

* @param resources Map of resource names to quantities

53

* @return Builder for method chaining

54

*/

55

public Builder setResources(Map<String, Double> resources);

56

57

/**

58

* Set maximum number of actor restarts.

59

* @param maxRestarts Maximum restarts (use constants NO_RESTART or INFINITE_RESTART)

60

* @return Builder for method chaining

61

*/

62

public Builder setMaxRestarts(int maxRestarts);

63

64

/**

65

* Set maximum task retries per actor method call.

66

* @param maxTaskRetries Maximum task retries

67

* @return Builder for method chaining

68

*/

69

public Builder setMaxTaskRetries(int maxTaskRetries);

70

71

/**

72

* Set JVM options for the actor process.

73

* @param jvmOptions List of JVM options

74

* @return Builder for method chaining

75

*/

76

public Builder setJvmOptions(List<String> jvmOptions);

77

78

/**

79

* Set maximum concurrent method calls.

80

* @param maxConcurrency Maximum concurrent calls

81

* @return Builder for method chaining

82

*/

83

public Builder setMaxConcurrency(int maxConcurrency);

84

85

/**

86

* Set placement group and bundle index.

87

* @param placementGroup Placement group

88

* @param bundleIndex Bundle index within placement group

89

* @return Builder for method chaining

90

*/

91

public Builder setPlacementGroup(PlacementGroup placementGroup, int bundleIndex);

92

93

/**

94

* Set runtime environment.

95

* @param runtimeEnv Runtime environment configuration

96

* @return Builder for method chaining

97

*/

98

public Builder setRuntimeEnv(RuntimeEnv runtimeEnv);

99

100

/**

101

* Build the actor creation options.

102

* @return ActorCreationOptions instance

103

*/

104

public ActorCreationOptions build();

105

}

106

}

107

108

/**

109

* Actor lifetime enumeration.

110

*/

111

public enum ActorLifetime {

112

/** Actor dies when creator process dies */

113

NON_DETACHED,

114

115

/** Actor persists beyond creator process lifetime */

116

DETACHED

117

}

118

```

119

120

### Actor Creation

121

122

Create stateful distributed workers that persist across multiple method calls.

123

124

```java { .api }

125

// Actor creation methods (0-6 parameters)

126

public static <A> ActorCreator<A> actor(RayFunc0<A> f);

127

public static <T0, A> ActorCreator<A> actor(RayFunc1<T0, A> f, T0 t0);

128

public static <T0, T1, A> ActorCreator<A> actor(RayFunc2<T0, T1, A> f, T0 t0, T1 t1);

129

public static <T0, T1, T2, A> ActorCreator<A> actor(RayFunc3<T0, T1, T2, A> f, T0 t0, T1 t1, T2 t2);

130

public static <T0, T1, T2, T3, A> ActorCreator<A> actor(RayFunc4<T0, T1, T2, T3, A> f, T0 t0, T1 t1, T2 t2, T3 t3);

131

public static <T0, T1, T2, T3, T4, A> ActorCreator<A> actor(RayFunc5<T0, T1, T2, T3, T4, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4);

132

public static <T0, T1, T2, T3, T4, T5, A> ActorCreator<A> actor(RayFunc6<T0, T1, T2, T3, T4, T5, A> f, T0 t0, T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);

133

134

public interface ActorCreator<A> {

135

/**

136

* Create the actor remotely.

137

* @return ActorHandle for calling actor methods

138

*/

139

ActorHandle<A> remote();

140

141

/**

142

* Set JVM options for the actor.

143

* @param jvmOptions List of JVM options

144

* @return ActorCreator for method chaining

145

*/

146

ActorCreator<A> setJvmOptions(List<String> jvmOptions);

147

148

/**

149

* Set concurrency groups for the actor.

150

* @param concurrencyGroups Concurrency group configuration

151

* @return ActorCreator for method chaining

152

*/

153

ActorCreator<A> setConcurrencyGroups(ConcurrencyGroup... concurrencyGroups);

154

155

/**

156

* Set runtime environment for the actor.

157

* @param runtimeEnv Runtime environment configuration

158

* @return ActorCreator for method chaining

159

*/

160

ActorCreator<A> setRuntimeEnv(RuntimeEnv runtimeEnv);

161

}

162

```

163

164

**Usage Examples:**

165

166

```java

167

public class Counter {

168

private int count = 0;

169

170

public Counter(int initialValue) {

171

this.count = initialValue;

172

}

173

174

public int increment() {

175

return ++count;

176

}

177

178

public int getValue() {

179

return count;

180

}

181

182

public void reset() {

183

count = 0;

184

}

185

}

186

187

public class ActorExample {

188

public static void main(String[] args) {

189

Ray.init();

190

191

// Create actor with constructor parameter

192

ActorHandle<Counter> counter = Ray.actor(Counter::new, 10).remote();

193

194

// Create actor with configuration

195

ActorHandle<Counter> configuredCounter = Ray.actor(Counter::new, 0)

196

.setJvmOptions(Arrays.asList("-Xmx1g", "-Xms512m"))

197

.setMaxConcurrency(4)

198

.remote();

199

200

// Create named actor with full configuration using ActorCreationOptions

201

ActorCreationOptions options = ActorCreationOptions.builder()

202

.setName("global-counter")

203

.setLifetime(ActorLifetime.DETACHED)

204

.setResources(Map.of("CPU", 2.0, "memory", 1000.0))

205

.setMaxRestarts(5)

206

.setJvmOptions(Arrays.asList("-Xmx2g"))

207

.setMaxConcurrency(8)

208

.build();

209

210

// Note: Full ActorCreationOptions integration would require additional API

211

// This demonstrates the options builder pattern available

212

213

Ray.shutdown();

214

}

215

}

216

```

217

218

### Actor Method Calls

219

220

Call methods on remote actors with full type safety and ObjectRef support.

221

222

```java { .api }

223

public interface ActorHandle<A> extends BaseActorHandle {

224

/**

225

* Call actor method with return value.

226

* Available for all method signatures (0-6 parameters).

227

*/

228

<R> ActorTaskCaller<R> task(/* method reference and parameters */);

229

}

230

231

public interface ActorTaskCaller<R> {

232

/**

233

* Execute the actor method call remotely.

234

* @return ObjectRef to the method result

235

*/

236

ObjectRef<R> remote();

237

}

238

```

239

240

**Usage Examples:**

241

242

```java

243

public class ActorMethodCalls {

244

public static void main(String[] args) {

245

Ray.init();

246

247

// Create counter actor

248

ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();

249

250

// Call methods and get results

251

ObjectRef<Integer> result1 = counter.task(Counter::increment).remote();

252

ObjectRef<Integer> result2 = counter.task(Counter::increment).remote();

253

ObjectRef<Integer> current = counter.task(Counter::getValue).remote();

254

255

// Methods execute in order on the same actor instance

256

System.out.println(Ray.get(result1)); // 1

257

System.out.println(Ray.get(result2)); // 2

258

System.out.println(Ray.get(current)); // 2

259

260

// Void method calls

261

ObjectRef<Void> resetResult = counter.task(Counter::reset).remote();

262

Ray.get(resetResult); // Wait for completion

263

264

ObjectRef<Integer> afterReset = counter.task(Counter::getValue).remote();

265

System.out.println(Ray.get(afterReset)); // 0

266

267

Ray.shutdown();

268

}

269

}

270

```

271

272

### Actor Handles

273

274

Manage actor lifecycle and get actor information.

275

276

```java { .api }

277

public interface BaseActorHandle {

278

/**

279

* Get the actor ID.

280

* @return ActorId of this actor

281

*/

282

ActorId getId();

283

284

/**

285

* Kill the actor (allows restart).

286

*/

287

void kill();

288

289

/**

290

* Kill the actor with restart control.

291

* @param noRestart If true, prevent actor restart

292

*/

293

void kill(boolean noRestart);

294

}

295

296

public interface ActorHandle<A> extends BaseActorHandle {

297

// Inherits from BaseActorHandle

298

// Plus type-safe method calling capability

299

}

300

```

301

302

**Usage Examples:**

303

304

```java

305

public class ActorLifecycle {

306

public static void main(String[] args) {

307

Ray.init();

308

309

// Create actor

310

ActorHandle<Counter> counter = Ray.actor(Counter::new, 0).remote();

311

312

// Get actor information

313

ActorId actorId = counter.getId();

314

System.out.println("Actor ID: " + actorId);

315

316

// Use actor

317

ObjectRef<Integer> result = counter.task(Counter::increment).remote();

318

System.out.println("Result: " + Ray.get(result));

319

320

// Kill actor (allows restart)

321

counter.kill();

322

323

// Kill actor permanently

324

// counter.kill(true);

325

326

Ray.shutdown();

327

}

328

}

329

```

330

331

### Named Actors

332

333

Create and retrieve actors by name for service discovery.

334

335

```java { .api }

336

/**

337

* Get a handle to a named actor in current namespace.

338

* @param name The name of the named actor

339

* @return Optional ActorHandle if actor exists

340

* @throws RayException if timed out

341

*/

342

public static <T extends BaseActorHandle> Optional<T> getActor(String name);

343

344

/**

345

* Get a handle to a named actor in specified namespace.

346

* @param name The name of the named actor

347

* @param namespace The namespace of the actor

348

* @return Optional ActorHandle if actor exists

349

* @throws RayException if timed out

350

*/

351

public static <T extends BaseActorHandle> Optional<T> getActor(String name, String namespace);

352

```

353

354

**Usage Examples:**

355

356

```java

357

public class NamedActors {

358

public static void main(String[] args) {

359

Ray.init();

360

361

// Create named actor (requires ActorCreationOptions)

362

ActorCreationOptions options = ActorCreationOptions.builder()

363

.setName("global-counter")

364

.setLifetime(ActorLifetime.DETACHED)

365

.build();

366

367

// Note: Named actor creation requires using options

368

// ActorHandle<Counter> counter = Ray.actor(Counter::new, 0)

369

// .setOptions(options)

370

// .remote();

371

372

// Get named actor from anywhere in the cluster

373

Optional<ActorHandle<Counter>> maybeCounter = Ray.getActor("global-counter");

374

375

if (maybeCounter.isPresent()) {

376

ActorHandle<Counter> counter = maybeCounter.get();

377

ObjectRef<Integer> result = counter.task(Counter::increment).remote();

378

System.out.println("Global counter: " + Ray.get(result));

379

} else {

380

System.out.println("Named actor not found");

381

}

382

383

// Get from specific namespace

384

Optional<ActorHandle<Counter>> nsCounter = Ray.getActor("counter", "production");

385

386

Ray.shutdown();

387

}

388

}

389

```

390

391

### Actor Exit

392

393

Gracefully exit from within an actor.

394

395

```java { .api }

396

/**

397

* Intentionally exit the current actor.

398

* Must be called from within an actor method.

399

* @throws RuntimeException if not called from within an actor

400

*/

401

public static void exitActor();

402

```

403

404

**Usage Example:**

405

406

```java

407

public class GracefulActor {

408

private boolean shouldStop = false;

409

410

public void processWork(String work) {

411

// Process work...

412

System.out.println("Processing: " + work);

413

414

if (shouldStop) {

415

// Clean shutdown

416

System.out.println("Actor shutting down gracefully");

417

Ray.exitActor();

418

}

419

}

420

421

public void shutdown() {

422

shouldStop = true;

423

}

424

}

425

426

public class ActorExit {

427

public static void main(String[] args) {

428

Ray.init();

429

430

ActorHandle<GracefulActor> actor = Ray.actor(GracefulActor::new).remote();

431

432

// Process some work

433

Ray.get(actor.task(GracefulActor::processWork, "task1").remote());

434

Ray.get(actor.task(GracefulActor::processWork, "task2").remote());

435

436

// Signal shutdown

437

Ray.get(actor.task(GracefulActor::shutdown).remote());

438

439

// This will cause the actor to exit

440

Ray.get(actor.task(GracefulActor::processWork, "final-task").remote());

441

442

Ray.shutdown();

443

}

444

}

445

```

446

447

## Advanced Actor Patterns

448

449

### Stateful Services

450

451

```java

452

public class DatabaseService {

453

private Map<String, String> data = new HashMap<>();

454

455

public void put(String key, String value) {

456

data.put(key, value);

457

}

458

459

public String get(String key) {

460

return data.get(key);

461

}

462

463

public Set<String> keys() {

464

return new HashSet<>(data.keySet());

465

}

466

467

public int size() {

468

return data.size();

469

}

470

}

471

472

public class StatefulService {

473

public static void main(String[] args) {

474

Ray.init();

475

476

// Create stateful database service

477

ActorHandle<DatabaseService> db = Ray.actor(DatabaseService::new).remote();

478

479

// Store data

480

Ray.get(db.task(DatabaseService::put, "user1", "Alice").remote());

481

Ray.get(db.task(DatabaseService::put, "user2", "Bob").remote());

482

483

// Query data

484

ObjectRef<String> user1 = db.task(DatabaseService::get, "user1").remote();

485

ObjectRef<Integer> count = db.task(DatabaseService::size).remote();

486

487

System.out.println("User1: " + Ray.get(user1)); // "Alice"

488

System.out.println("Count: " + Ray.get(count)); // 2

489

490

Ray.shutdown();

491

}

492

}

493

```

494

495

### Actor Pools

496

497

```java

498

public class Worker {

499

private final int workerId;

500

501

public Worker(int id) {

502

this.workerId = id;

503

}

504

505

public String process(String task) {

506

// Simulate work

507

try {

508

Thread.sleep(1000);

509

} catch (InterruptedException e) {

510

Thread.currentThread().interrupt();

511

}

512

return "Worker " + workerId + " processed: " + task;

513

}

514

}

515

516

public class ActorPool {

517

public static void main(String[] args) {

518

Ray.init();

519

520

// Create pool of worker actors

521

List<ActorHandle<Worker>> workers = new ArrayList<>();

522

for (int i = 0; i < 5; i++) {

523

workers.add(Ray.actor(Worker::new, i).remote());

524

}

525

526

// Distribute work across pool

527

List<ObjectRef<String>> results = new ArrayList<>();

528

for (int i = 0; i < 20; i++) {

529

ActorHandle<Worker> worker = workers.get(i % workers.size());

530

ObjectRef<String> result = worker.task(Worker::process, "task-" + i).remote();

531

results.add(result);

532

}

533

534

// Wait for all results

535

List<String> allResults = Ray.get(results);

536

allResults.forEach(System.out::println);

537

538

Ray.shutdown();

539

}

540

}

541

```

542

543

### Error Handling and Recovery

544

545

```java

546

public class ResilientActor {

547

private int processedCount = 0;

548

549

public String processTask(String task) {

550

processedCount++;

551

552

// Simulate occasional failures

553

if (task.contains("error")) {

554

throw new RuntimeException("Task processing failed: " + task);

555

}

556

557

return "Processed " + task + " (total: " + processedCount + ")";

558

}

559

560

public int getProcessedCount() {

561

return processedCount;

562

}

563

}

564

565

public class ActorErrorHandling {

566

public static void main(String[] args) {

567

Ray.init();

568

569

ActorHandle<ResilientActor> actor = Ray.actor(ResilientActor::new).remote();

570

571

// Process successful tasks

572

ObjectRef<String> result1 = actor.task(ResilientActor::processTask, "task1").remote();

573

System.out.println(Ray.get(result1));

574

575

// Process failing task

576

try {

577

ObjectRef<String> result2 = actor.task(ResilientActor::processTask, "error-task").remote();

578

Ray.get(result2);

579

} catch (RayTaskException e) {

580

System.out.println("Task failed as expected: " + e.getMessage());

581

}

582

583

// Actor state is preserved despite the error

584

ObjectRef<Integer> count = actor.task(ResilientActor::getProcessedCount).remote();

585

System.out.println("Tasks processed: " + Ray.get(count)); // 2 (including the failed one)

586

587

Ray.shutdown();

588

}

589

}

590

```