or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdadvanced-actors.mdcross-language.mdindex.mdobject-store.mdplacement-groups.mdruntime.mdtasks.md

advanced-actors.mddocs/

0

# Advanced Actor Features

1

2

Specialized actor patterns including parallel actors and concurrency group management for high-performance distributed computing scenarios.

3

4

## Capabilities

5

6

### Parallel Actors

7

8

Parallel actors enable scaling a single logical actor across multiple processes for increased throughput.

9

10

```java { .api }

11

/**

12

* Base class for parallel actor implementations.

13

* Extend this class to create parallel actors.

14

*/

15

public class ParallelActor {

16

// Base functionality for parallel actor implementations

17

}

18

19

/**

20

* Handle for managing parallel actor instances.

21

*/

22

public interface ParallelActorHandle {

23

/**

24

* Get the number of parallel instances.

25

* @return Number of parallel actor instances

26

*/

27

int getNumInstances();

28

29

/**

30

* Get a specific parallel actor instance.

31

* @param index Instance index

32

* @return ParallelActorInstance handle

33

*/

34

ParallelActorInstance getInstance(int index);

35

36

/**

37

* Kill all parallel actor instances.

38

*/

39

void kill();

40

}

41

42

/**

43

* Individual instance within a parallel actor.

44

*/

45

public interface ParallelActorInstance {

46

/**

47

* Get the instance index.

48

* @return Index of this instance

49

*/

50

int getIndex();

51

52

/**

53

* Call method on this specific instance.

54

*/

55

// Method calling capability similar to regular actors

56

}

57

58

/**

59

* Context information for parallel actors.

60

*/

61

public interface ParallelActorContext {

62

/**

63

* Get current instance index.

64

* @return Index of current parallel actor instance

65

*/

66

int getCurrentInstanceIndex();

67

68

/**

69

* Get total number of instances.

70

* @return Total parallel actor instances

71

*/

72

int getTotalInstances();

73

}

74

75

/**

76

* Creator for parallel actors.

77

*/

78

public class ParallelActorCreator {

79

/**

80

* Set number of parallel instances.

81

* @param numInstances Number of parallel instances to create

82

* @return ParallelActorCreator for method chaining

83

*/

84

public ParallelActorCreator setNumInstances(int numInstances);

85

86

/**

87

* Create the parallel actor remotely.

88

* @return ParallelActorHandle for managing instances

89

*/

90

public ParallelActorHandle remote();

91

}

92

```

93

94

**Usage Examples:**

95

96

```java

97

public class ParallelWorker extends ParallelActor {

98

private final String workerId;

99

private int processedTasks = 0;

100

101

public ParallelWorker(String baseId) {

102

// Access parallel actor context

103

ParallelActorContext context = getParallelActorContext(); // Hypothetical method

104

this.workerId = baseId + "-" + context.getCurrentInstanceIndex();

105

}

106

107

public String processTask(String task) {

108

processedTasks++;

109

110

// Simulate processing time

111

try {

112

Thread.sleep(100);

113

} catch (InterruptedException e) {

114

Thread.currentThread().interrupt();

115

}

116

117

return workerId + " processed: " + task + " (total: " + processedTasks + ")";

118

}

119

120

public int getProcessedCount() {

121

return processedTasks;

122

}

123

}

124

125

public class ParallelActorExample {

126

public static void main(String[] args) {

127

Ray.init();

128

129

// Create parallel actor with 4 instances

130

ParallelActorCreator creator = new ParallelActorCreator();

131

ParallelActorHandle parallelWorker = creator

132

.setNumInstances(4)

133

.remote();

134

135

// Distribute work across parallel instances

136

List<ObjectRef<String>> results = new ArrayList<>();

137

138

for (int i = 0; i < 20; i++) {

139

// Ray automatically load-balances across instances

140

ParallelActorTaskCaller<String> taskCaller =

141

parallelWorker.task(ParallelWorker::processTask, "task-" + i);

142

results.add(taskCaller.remote());

143

}

144

145

// Wait for all results

146

List<String> allResults = Ray.get(results);

147

allResults.forEach(System.out::println);

148

149

// Access specific instances

150

for (int i = 0; i < parallelWorker.getNumInstances(); i++) {

151

ParallelActorInstance instance = parallelWorker.getInstance(i);

152

// Call methods on specific instance...

153

}

154

155

// Clean up

156

parallelWorker.kill();

157

158

Ray.shutdown();

159

}

160

}

161

```

162

163

### Parallel Actor Task Calling

164

165

Type-safe method calling for parallel actors with automatic load balancing.

166

167

```java { .api }

168

/**

169

* Task caller for parallel actor methods with return values.

170

*/

171

public class ParallelActorTaskCaller<R> {

172

/**

173

* Execute the method call on an available parallel actor instance.

174

* @return ObjectRef to the method result

175

*/

176

public ObjectRef<R> remote();

177

}

178

179

/**

180

* Task caller for void parallel actor methods.

181

*/

182

public class VoidParallelActorTaskCaller {

183

/**

184

* Execute the void method call on an available parallel actor instance.

185

* @return ObjectRef<Void> for synchronization

186

*/

187

public ObjectRef<Void> remote();

188

}

189

```

190

191

**Usage Examples:**

192

193

```java

194

public class ParallelActorTasking {

195

public static void main(String[] args) {

196

Ray.init();

197

198

// Create parallel actor

199

ParallelActorHandle parallelWorker = createParallelWorker(8);

200

201

// High-throughput task processing

202

List<ObjectRef<String>> batchResults = new ArrayList<>();

203

204

for (int batch = 0; batch < 10; batch++) {

205

List<ObjectRef<String>> batchTasks = new ArrayList<>();

206

207

// Process batch in parallel

208

for (int i = 0; i < 100; i++) {

209

String taskData = "batch-" + batch + "-task-" + i;

210

ParallelActorTaskCaller<String> caller =

211

parallelWorker.task(ParallelWorker::processTask, taskData);

212

batchTasks.add(caller.remote());

213

}

214

215

// Wait for batch completion

216

List<String> batchComplete = Ray.get(batchTasks);

217

System.out.println("Batch " + batch + " completed: " + batchComplete.size() + " tasks");

218

}

219

220

Ray.shutdown();

221

}

222

223

private static ParallelActorHandle createParallelWorker(int instances) {

224

ParallelActorCreator creator = new ParallelActorCreator();

225

return creator.setNumInstances(instances).remote();

226

}

227

}

228

```

229

230

### Concurrency Groups

231

232

Manage method-level concurrency within actors for fine-grained performance control.

233

234

```java { .api }

235

/**

236

* Concurrency group interface for managing concurrent method execution.

237

*/

238

public interface ConcurrencyGroup {

239

/**

240

* Get the concurrency group name.

241

* @return Name of the concurrency group

242

*/

243

String getName();

244

245

/**

246

* Get the maximum concurrency level.

247

* @return Maximum concurrent method executions

248

*/

249

int getMaxConcurrency();

250

}

251

252

/**

253

* Builder for creating concurrency groups.

254

*/

255

public class ConcurrencyGroupBuilder {

256

/**

257

* Set the concurrency group name.

258

* @param name Group name

259

* @return Builder for method chaining

260

*/

261

public ConcurrencyGroupBuilder setName(String name);

262

263

/**

264

* Set maximum concurrency level.

265

* @param maxConcurrency Maximum concurrent executions

266

* @return Builder for method chaining

267

*/

268

public ConcurrencyGroupBuilder setMaxConcurrency(int maxConcurrency);

269

270

/**

271

* Build the concurrency group.

272

* @return ConcurrencyGroup instance

273

*/

274

public ConcurrencyGroup build();

275

}

276

277

/**

278

* Base builder class for concurrency groups.

279

*/

280

public class BaseConcurrencyGroupBuilder {

281

// Base functionality for concurrency group builders

282

}

283

```

284

285

**Usage Examples:**

286

287

```java

288

public class ConcurrencyGroupExample {

289

public static void main(String[] args) {

290

Ray.init();

291

292

// Create concurrency groups

293

ConcurrencyGroup ioGroup = new ConcurrencyGroupBuilder()

294

.setName("io-operations")

295

.setMaxConcurrency(10)

296

.build();

297

298

ConcurrencyGroup computeGroup = new ConcurrencyGroupBuilder()

299

.setName("compute-operations")

300

.setMaxConcurrency(2)

301

.build();

302

303

// Create actor with concurrency groups

304

ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new)

305

.setConcurrencyGroups(ioGroup, computeGroup)

306

.remote();

307

308

// Methods will be executed according to their concurrency group limits

309

310

Ray.shutdown();

311

}

312

}

313

```

314

315

### Concurrency Group Annotations

316

317

Use annotations to specify concurrency groups at the method level.

318

319

```java { .api }

320

/**

321

* Annotation to define a concurrency group.

322

*/

323

@interface DefConcurrencyGroup {

324

/**

325

* Concurrency group name.

326

* @return Group name

327

*/

328

String name();

329

330

/**

331

* Maximum concurrency level.

332

* @return Maximum concurrent executions

333

*/

334

int maxConcurrency();

335

}

336

337

/**

338

* Annotation to define multiple concurrency groups.

339

*/

340

@interface DefConcurrencyGroups {

341

/**

342

* Array of concurrency group definitions.

343

* @return Array of DefConcurrencyGroup annotations

344

*/

345

DefConcurrencyGroup[] value();

346

}

347

348

/**

349

* Annotation to specify which concurrency group a method uses.

350

*/

351

@interface UseConcurrencyGroup {

352

/**

353

* Concurrency group name to use.

354

* @return Group name

355

*/

356

String value();

357

}

358

```

359

360

**Usage Examples:**

361

362

```java

363

@DefConcurrencyGroups({

364

@DefConcurrencyGroup(name = "io", maxConcurrency = 10),

365

@DefConcurrencyGroup(name = "compute", maxConcurrency = 2),

366

@DefConcurrencyGroup(name = "db", maxConcurrency = 5)

367

})

368

public class ConcurrentActor {

369

370

@UseConcurrencyGroup("io")

371

public String readFile(String filename) {

372

// I/O intensive operation - can run up to 10 concurrently

373

try {

374

Thread.sleep(1000); // Simulate file read

375

} catch (InterruptedException e) {

376

Thread.currentThread().interrupt();

377

}

378

return "Read: " + filename;

379

}

380

381

@UseConcurrencyGroup("compute")

382

public double computeHeavy(double input) {

383

// CPU intensive operation - limited to 2 concurrent executions

384

double result = input;

385

for (int i = 0; i < 1000000; i++) {

386

result = Math.sqrt(result + 1);

387

}

388

return result;

389

}

390

391

@UseConcurrencyGroup("db")

392

public String queryDatabase(String query) {

393

// Database operation - up to 5 concurrent queries

394

try {

395

Thread.sleep(500); // Simulate DB query

396

} catch (InterruptedException e) {

397

Thread.currentThread().interrupt();

398

}

399

return "Query result: " + query;

400

}

401

402

// Methods without annotation use default concurrency (1)

403

public String defaultMethod(String input) {

404

return "Default: " + input;

405

}

406

}

407

408

public class ConcurrencyAnnotationExample {

409

public static void main(String[] args) {

410

Ray.init();

411

412

// Create actor - concurrency groups are automatically configured from annotations

413

ActorHandle<ConcurrentActor> actor = Ray.actor(ConcurrentActor::new).remote();

414

415

// Launch many I/O operations (up to 10 concurrent)

416

List<ObjectRef<String>> ioResults = new ArrayList<>();

417

for (int i = 0; i < 20; i++) {

418

ObjectRef<String> result = actor.task(ConcurrentActor::readFile, "file-" + i).remote();

419

ioResults.add(result);

420

}

421

422

// Launch compute operations (limited to 2 concurrent)

423

List<ObjectRef<Double>> computeResults = new ArrayList<>();

424

for (int i = 0; i < 10; i++) {

425

ObjectRef<Double> result = actor.task(ConcurrentActor::computeHeavy, i * 1.5).remote();

426

computeResults.add(result);

427

}

428

429

// Launch database operations (up to 5 concurrent)

430

List<ObjectRef<String>> dbResults = new ArrayList<>();

431

for (int i = 0; i < 15; i++) {

432

ObjectRef<String> result = actor.task(ConcurrentActor::queryDatabase, "SELECT * FROM table" + i).remote();

433

dbResults.add(result);

434

}

435

436

// Wait for results

437

System.out.println("I/O operations completed: " + Ray.get(ioResults).size());

438

System.out.println("Compute operations completed: " + Ray.get(computeResults).size());

439

System.out.println("DB operations completed: " + Ray.get(dbResults).size());

440

441

Ray.shutdown();

442

}

443

}

444

```

445

446

## Advanced Call Framework

447

448

### Parallel Actor Call Interface

449

450

```java { .api }

451

/**

452

* Base call class for parallel actor operations.

453

*/

454

public class Call {

455

// Base functionality for actor method calls

456

}

457

458

/**

459

* Actor call interface for method invocation.

460

*/

461

public interface ActorCall {

462

/**

463

* Execute the actor method call.

464

* @return Result of the method call

465

*/

466

Object call();

467

}

468

```

469

470

## Performance Optimization Patterns

471

472

### High-Throughput Processing

473

474

```java

475

public class HighThroughputProcessor extends ParallelActor {

476

@DefConcurrencyGroup(name = "process", maxConcurrency = 4)

477

public class ConcurrentProcessor {

478

479

@UseConcurrencyGroup("process")

480

public String processItem(String item) {

481

// Process item with controlled concurrency

482

return "Processed: " + item;

483

}

484

}

485

486

public static void main(String[] args) {

487

Ray.init();

488

489

// Combine parallel actors with concurrency groups

490

ParallelActorHandle processor = new ParallelActorCreator()

491

.setNumInstances(4) // 4 parallel instances

492

.remote();

493

494

// Each instance can process 4 items concurrently

495

// Total concurrency: 4 instances × 4 concurrent methods = 16

496

497

List<ObjectRef<String>> results = new ArrayList<>();

498

for (int i = 0; i < 1000; i++) {

499

ParallelActorTaskCaller<String> caller =

500

processor.task(ConcurrentProcessor::processItem, "item-" + i);

501

results.add(caller.remote());

502

}

503

504

List<String> completed = Ray.get(results);

505

System.out.println("Processed " + completed.size() + " items");

506

507

Ray.shutdown();

508

}

509

}

510

```

511

512

### Load Balancing and Fault Tolerance

513

514

```java

515

public class ResilientParallelActor extends ParallelActor {

516

private int processedCount = 0;

517

518

public String processWithRetry(String data) {

519

try {

520

// Simulate occasional failures

521

if (Math.random() < 0.1) {

522

throw new RuntimeException("Simulated processing error");

523

}

524

525

processedCount++;

526

return "Processed: " + data + " (instance processed: " + processedCount + ")";

527

528

} catch (Exception e) {

529

// Log error but don't propagate - let Ray handle retry logic

530

System.err.println("Processing error: " + e.getMessage());

531

throw e;

532

}

533

}

534

535

public int getProcessedCount() {

536

return processedCount;

537

}

538

}

539

540

public class FaultTolerantProcessing {

541

public static void main(String[] args) {

542

Ray.init();

543

544

// Create parallel actor with multiple instances for fault tolerance

545

ParallelActorHandle processor = new ParallelActorCreator()

546

.setNumInstances(6)

547

.remote();

548

549

List<ObjectRef<String>> results = new ArrayList<>();

550

551

// Process many items - failures on one instance don't affect others

552

for (int i = 0; i < 500; i++) {

553

ParallelActorTaskCaller<String> caller =

554

processor.task(ResilientParallelActor::processWithRetry, "data-" + i);

555

results.add(caller.remote());

556

}

557

558

// Handle results with error handling

559

int successCount = 0;

560

int errorCount = 0;

561

562

for (ObjectRef<String> result : results) {

563

try {

564

String value = Ray.get(result);

565

successCount++;

566

} catch (Exception e) {

567

errorCount++;

568

System.err.println("Task failed: " + e.getMessage());

569

}

570

}

571

572

System.out.println("Success: " + successCount + ", Errors: " + errorCount);

573

574

// Check instance statistics

575

for (int i = 0; i < processor.getNumInstances(); i++) {

576

ParallelActorInstance instance = processor.getInstance(i);

577

// Get stats from each instance...

578

}

579

580

Ray.shutdown();

581

}

582

}

583

```

584

585

## Best Practices

586

587

### Choosing Between Actor Types

588

589

```java

590

// Regular actors: For stateful services, single-threaded processing

591

ActorHandle<DatabaseService> dbService = Ray.actor(DatabaseService::new).remote();

592

593

// Parallel actors: For high-throughput, stateless processing

594

ParallelActorHandle processor = new ParallelActorCreator()

595

.setNumInstances(8)

596

.remote();

597

598

// Concurrent actors: For I/O bound operations with controlled concurrency

599

@DefConcurrencyGroup(name = "io", maxConcurrency = 10)

600

ActorHandle<IOService> ioService = Ray.actor(IOService::new).remote();

601

```

602

603

### Resource Management

604

605

```java

606

// Configure resources for parallel actors

607

ParallelActorHandle resourceAwareProcessor = new ParallelActorCreator()

608

.setNumInstances(4)

609

.setResources(Map.of("CPU", 2.0, "memory", 1000.0)) // Per instance

610

.remote();

611

612

// Total resources: 4 instances × (2 CPU + 1GB memory) = 8 CPU + 4GB memory

613

```

614

615

### Monitoring and Debugging

616

617

```java

618

public class ActorMonitoring {

619

public static void main(String[] args) {

620

Ray.init();

621

622

ParallelActorHandle processor = createParallelProcessor();

623

624

// Monitor parallel actor instances

625

System.out.println("Parallel actor instances: " + processor.getNumInstances());

626

627

for (int i = 0; i < processor.getNumInstances(); i++) {

628

ParallelActorInstance instance = processor.getInstance(i);

629

System.out.println("Instance " + instance.getIndex() + " status: active");

630

}

631

632

// Get runtime context for detailed information

633

RuntimeContext context = Ray.getRuntimeContext();

634

List<ActorInfo> actorInfos = context.getAllActorInfo();

635

636

for (ActorInfo info : actorInfos) {

637

if (info.getClassName().contains("ParallelActor")) {

638

System.out.println("Parallel actor: " + info.getActorId() +

639

" State: " + info.getState() +

640

" Node: " + info.getNodeId());

641

}

642

}

643

644

Ray.shutdown();

645

}

646

}

647

```