or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdcollections.mdcore-api.mdexceptions.mdindex.mdjson-api.mdlogging.mdutilities.md

actors.mddocs/

0

# Actor Framework

1

2

Apache ActiveMQ Artemis Commons provides a powerful actor framework for asynchronous message processing with guaranteed ordering and built-in flow control. The framework ensures thread-safe, ordered execution of tasks while providing sophisticated state management and backpressure capabilities.

3

4

## Capabilities

5

6

### Core Actor System

7

8

The foundation of the actor framework providing ordered message processing.

9

10

#### Actor

11

12

Primary actor implementation for processing messages of type T with guaranteed ordering.

13

14

```java { .api }

15

class Actor<T> extends ProcessorBase<T> {

16

// Constructor

17

Actor(Executor parent, ActorListener<T> listener);

18

19

// Message processing

20

void act(T message);

21

}

22

23

interface ActorListener<T> {

24

void onMessage(T message);

25

}

26

```

27

28

#### ThresholdActor

29

30

Actor with threshold-based flow control for backpressure management and resource monitoring.

31

32

```java { .api }

33

class ThresholdActor<T> extends ProcessorBase<Object> {

34

// Constructor

35

ThresholdActor(Executor parent,

36

ActorListener<T> listener,

37

int maxSize,

38

ToIntFunction<T> sizeGetter,

39

Runnable overThreshold,

40

Runnable clearThreshold);

41

42

// Message processing with threshold management

43

void act(T message);

44

45

// Flow control

46

void flush();

47

void requestShutdown();

48

}

49

```

50

51

### Ordered Execution

52

53

Executors that guarantee strict task ordering while leveraging thread pools.

54

55

#### OrderedExecutor

56

57

Executor ensuring all tasks execute in strict order with optional fairness control.

58

59

```java { .api }

60

class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {

61

// Constructor

62

OrderedExecutor(Executor delegate);

63

64

// Task execution

65

void execute(Runnable run);

66

67

// Fairness control

68

boolean isFair();

69

OrderedExecutor setFair(boolean fair);

70

71

// String representation

72

String toString();

73

}

74

```

75

76

#### OrderedExecutorFactory

77

78

Factory for creating OrderedExecutor instances with shared configuration.

79

80

```java { .api }

81

class OrderedExecutorFactory implements ExecutorFactory {

82

// Constructor

83

OrderedExecutorFactory(Executor parent);

84

85

// Factory methods

86

ArtemisExecutor getExecutor();

87

Executor getParent();

88

89

// Configuration

90

boolean isFair();

91

OrderedExecutorFactory setFair(boolean fair);

92

93

// Static utilities

94

static boolean flushExecutor(Executor executor);

95

static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit);

96

}

97

```

98

99

### Extended Executor Interface

100

101

Enhanced executor with lifecycle management and flow control.

102

103

#### ArtemisExecutor

104

105

Extended executor interface with shutdown control, fairness, and flow management.

106

107

```java { .api }

108

interface ArtemisExecutor extends Executor {

109

// Static factory

110

static ArtemisExecutor delegate(Executor executor);

111

112

// Shutdown control

113

default int shutdownNow(Consumer<? super Runnable> onPendingTask, int timeout, TimeUnit unit);

114

default int shutdownNow();

115

default void shutdown();

116

117

// Flow control

118

default boolean flush(long timeout, TimeUnit unit);

119

default boolean isFlushed();

120

default void yield();

121

122

// Fairness

123

default boolean isFair();

124

default ArtemisExecutor setFair(boolean fair);

125

126

// State checking

127

default boolean inHandler();

128

}

129

```

130

131

### Base Processing Infrastructure

132

133

Foundational classes providing common functionality for all actors and processors.

134

135

#### ProcessorBase

136

137

Abstract base class with state management, shutdown controls, and task queuing.

138

139

```java { .api }

140

abstract class ProcessorBase<T> extends HandlerBase {

141

// State constants

142

static final int STATE_NOT_RUNNING = 0;

143

static final int STATE_RUNNING = 1;

144

static final int STATE_FORCED_SHUTDOWN = 2;

145

static final int STATE_PAUSED = 3;

146

147

// Constructor

148

ProcessorBase(Executor parent);

149

150

// State management

151

void pauseProcessing();

152

void resumeProcessing();

153

int status();

154

boolean isFlushed();

155

156

// Shutdown methods

157

void shutdown();

158

void shutdown(long timeout, TimeUnit unit);

159

int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit);

160

161

// Flow control

162

void flush();

163

boolean flush(long timeout, TimeUnit unit);

164

void yield();

165

166

// Monitoring

167

int remaining();

168

169

// Abstract method for implementation

170

protected abstract void doTask(T task);

171

172

// Protected methods

173

protected void task(T command);

174

}

175

```

176

177

#### HandlerBase

178

179

Base class providing ThreadLocal-based handler detection.

180

181

```java { .api }

182

abstract class HandlerBase {

183

// Handler context detection

184

boolean inHandler();

185

186

// Protected context management

187

protected void enter();

188

protected void leave();

189

}

190

```

191

192

## Usage Examples

193

194

### Basic Actor Pattern

195

196

```java

197

import org.apache.activemq.artemis.utils.actors.Actor;

198

import org.apache.activemq.artemis.utils.actors.ActorListener;

199

200

// Create thread pool

201

ExecutorService threadPool = Executors.newFixedThreadPool(4);

202

203

// Create actor for processing messages

204

ActorListener<String> messageProcessor = message -> {

205

System.out.println("Processing: " + message);

206

// Perform message processing

207

processMessage(message);

208

};

209

210

Actor<String> messageActor = new Actor<>(threadPool, messageProcessor);

211

212

// Send messages - all processed in order

213

messageActor.act("Message 1");

214

messageActor.act("Message 2");

215

messageActor.act("Message 3");

216

217

// Graceful shutdown

218

messageActor.shutdown(30, TimeUnit.SECONDS);

219

```

220

221

### ThresholdActor for Flow Control

222

223

```java

224

import org.apache.activemq.artemis.utils.actors.ThresholdActor;

225

226

// Create threshold actor for memory management

227

ThresholdActor<ByteBuffer> bufferProcessor = new ThresholdActor<>(

228

threadPool,

229

buffer -> processBuffer(buffer), // Message processor

230

1024 * 1024, // 1MB threshold

231

ByteBuffer::remaining, // Size calculation

232

() -> { // Over threshold callback

233

logger.warn("Buffer threshold exceeded - applying backpressure");

234

applyBackpressure();

235

},

236

() -> { // Clear threshold callback

237

logger.info("Buffer threshold cleared - releasing backpressure");

238

releaseBackpressure();

239

}

240

);

241

242

// Process buffers with automatic threshold management

243

for (ByteBuffer buffer : incomingBuffers) {

244

bufferProcessor.act(buffer); // Threshold callbacks triggered automatically

245

}

246

247

// Flush and check final state

248

bufferProcessor.flush();

249

```

250

251

### Ordered Executor Pattern

252

253

```java

254

import org.apache.activemq.artemis.utils.actors.OrderedExecutor;

255

import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;

256

257

// Create factory for ordered executors

258

OrderedExecutorFactory factory = new OrderedExecutorFactory(threadPool)

259

.setFair(true); // Enable fairness - yields after each task

260

261

// Create ordered executors for different channels

262

OrderedExecutor channel1 = (OrderedExecutor) factory.getExecutor();

263

OrderedExecutor channel2 = (OrderedExecutor) factory.getExecutor();

264

265

// Tasks within each channel execute in order

266

channel1.execute(() -> processTask("Channel1-Task1"));

267

channel1.execute(() -> processTask("Channel1-Task2"));

268

channel1.execute(() -> processTask("Channel1-Task3"));

269

270

channel2.execute(() -> processTask("Channel2-Task1"));

271

channel2.execute(() -> processTask("Channel2-Task2"));

272

273

// Wait for completion

274

boolean flushed1 = channel1.flush(10, TimeUnit.SECONDS);

275

boolean flushed2 = channel2.flush(10, TimeUnit.SECONDS);

276

277

if (flushed1 && flushed2) {

278

System.out.println("All tasks completed");

279

}

280

```

281

282

### Advanced State Management

283

284

```java

285

import org.apache.activemq.artemis.utils.actors.ProcessorBase;

286

287

public class CustomProcessor extends ProcessorBase<WorkItem> {

288

289

public CustomProcessor(Executor parent) {

290

super(parent);

291

}

292

293

@Override

294

protected void doTask(WorkItem item) {

295

// Process work item

296

if (item.isHighPriority()) {

297

processImmediately(item);

298

} else {

299

processNormally(item);

300

}

301

}

302

303

public void processWork(WorkItem item) {

304

// Add to processing queue

305

task(item);

306

}

307

308

public void handleShutdown() {

309

// Pause processing

310

pauseProcessing();

311

312

// Process remaining items with timeout

313

int remaining = shutdownNow(

314

item -> handlePendingItem(item), // Handle each pending item

315

30, // 30 second timeout

316

TimeUnit.SECONDS

317

);

318

319

if (remaining > 0) {

320

logger.warn("Shutdown incomplete, {} items remaining", remaining);

321

}

322

}

323

324

public void checkHealth() {

325

int status = status();

326

int pending = remaining(); // O(n) operation

327

328

switch (status) {

329

case STATE_RUNNING:

330

logger.info("Processor running, {} pending items", pending);

331

break;

332

case STATE_PAUSED:

333

logger.info("Processor paused, {} pending items", pending);

334

break;

335

case STATE_FORCED_SHUTDOWN:

336

logger.warn("Processor force shutdown");

337

break;

338

}

339

}

340

}

341

```

342

343

### Flow Control Patterns

344

345

```java

346

// ArtemisExecutor with flow control

347

ArtemisExecutor executor = ArtemisExecutor.delegate(threadPool);

348

349

// Submit batch of tasks

350

for (int i = 0; i < 1000; i++) {

351

final int taskId = i;

352

executor.execute(() -> processTask(taskId));

353

354

// Yield periodically for fairness

355

if (i % 100 == 0) {

356

executor.yield();

357

}

358

}

359

360

// Wait for completion with timeout

361

boolean completed = executor.flush(60, TimeUnit.SECONDS);

362

363

if (!completed) {

364

// Force shutdown and handle pending tasks

365

int pending = executor.shutdownNow(

366

task -> logger.warn("Discarding pending task: {}", task),

367

10,

368

TimeUnit.SECONDS

369

);

370

logger.warn("Forced shutdown, {} tasks not completed", pending);

371

}

372

373

// Quick flush check

374

if (executor.isFlushed()) {

375

logger.info("Executor is completely flushed");

376

}

377

```

378

379

### Factory Pattern with Configuration

380

381

```java

382

// Create factory with shared thread pool

383

OrderedExecutorFactory factory = new OrderedExecutorFactory(

384

Executors.newFixedThreadPool(8,

385

new ActiveMQThreadFactory("ProcessorPool", true, getClassLoader()))

386

);

387

388

// Configure fairness for all executors

389

factory.setFair(true);

390

391

// Create executors for different subsystems

392

ArtemisExecutor messageProcessor = factory.getExecutor();

393

ArtemisExecutor connectionManager = factory.getExecutor();

394

ArtemisExecutor auditLogger = factory.getExecutor();

395

396

// Each subsystem gets ordered execution but can run in parallel

397

messageProcessor.execute(() -> handleIncomingMessage());

398

connectionManager.execute(() -> manageConnections());

399

auditLogger.execute(() -> logAuditEvent());

400

401

// Factory utilities for bulk operations

402

boolean allFlushed = OrderedExecutorFactory.flushExecutor(

403

factory.getParent(), 30, TimeUnit.SECONDS

404

);

405

```

406

407

## Design Patterns

408

409

### Message Processing Pipeline

410

411

```java

412

// Create pipeline with actors

413

Actor<RawMessage> parser = new Actor<>(threadPool, this::parseMessage);

414

Actor<ParsedMessage> validator = new Actor<>(threadPool, this::validateMessage);

415

Actor<ValidMessage> processor = new Actor<>(threadPool, this::processMessage);

416

417

// Chain processing

418

public void parseMessage(RawMessage raw) {

419

ParsedMessage parsed = parse(raw);

420

validator.act(parsed);

421

}

422

423

public void validateMessage(ParsedMessage parsed) {

424

if (isValid(parsed)) {

425

ValidMessage valid = new ValidMessage(parsed);

426

processor.act(valid);

427

}

428

}

429

```

430

431

### Backpressure Management

432

433

```java

434

// Coordinated backpressure across multiple threshold actors

435

AtomicBoolean backpressureActive = new AtomicBoolean(false);

436

437

ThresholdActor<Message> messageActor = new ThresholdActor<>(

438

threadPool, this::processMessage,

439

1000, Message::getSize,

440

() -> activateBackpressure(),

441

() -> releaseBackpressure()

442

);

443

444

ThresholdActor<Connection> connectionActor = new ThresholdActor<>(

445

threadPool, this::manageConnection,

446

50, conn -> 1, // Count-based threshold

447

() -> activateBackpressure(),

448

() -> releaseBackpressure()

449

);

450

451

private void activateBackpressure() {

452

if (backpressureActive.compareAndSet(false, true)) {

453

logger.warn("Activating system-wide backpressure");

454

notifyBackpressureListeners(true);

455

}

456

}

457

```

458

459

The actor framework provides a robust foundation for building scalable, ordered, asynchronous processing systems with sophisticated flow control and state management capabilities.