0
# Actor Framework
1
2
Apache ActiveMQ Artemis Commons provides a powerful actor framework for asynchronous message processing with guaranteed ordering and built-in flow control. The framework ensures thread-safe, ordered execution of tasks while providing sophisticated state management and backpressure capabilities.
3
4
## Capabilities
5
6
### Core Actor System
7
8
The foundation of the actor framework providing ordered message processing.
9
10
#### Actor
11
12
Primary actor implementation for processing messages of type T with guaranteed ordering.
13
14
```java { .api }
15
class Actor<T> extends ProcessorBase<T> {
16
// Constructor
17
Actor(Executor parent, ActorListener<T> listener);
18
19
// Message processing
20
void act(T message);
21
}
22
23
interface ActorListener<T> {
24
void onMessage(T message);
25
}
26
```
27
28
#### ThresholdActor
29
30
Actor with threshold-based flow control for backpressure management and resource monitoring.
31
32
```java { .api }
33
class ThresholdActor<T> extends ProcessorBase<Object> {
34
// Constructor
35
ThresholdActor(Executor parent,
36
ActorListener<T> listener,
37
int maxSize,
38
ToIntFunction<T> sizeGetter,
39
Runnable overThreshold,
40
Runnable clearThreshold);
41
42
// Message processing with threshold management
43
void act(T message);
44
45
// Flow control
46
void flush();
47
void requestShutdown();
48
}
49
```
50
51
### Ordered Execution
52
53
Executors that guarantee strict task ordering while leveraging thread pools.
54
55
#### OrderedExecutor
56
57
Executor ensuring all tasks execute in strict order with optional fairness control.
58
59
```java { .api }
60
class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
61
// Constructor
62
OrderedExecutor(Executor delegate);
63
64
// Task execution
65
void execute(Runnable run);
66
67
// Fairness control
68
boolean isFair();
69
OrderedExecutor setFair(boolean fair);
70
71
// String representation
72
String toString();
73
}
74
```
75
76
#### OrderedExecutorFactory
77
78
Factory for creating OrderedExecutor instances with shared configuration.
79
80
```java { .api }
81
class OrderedExecutorFactory implements ExecutorFactory {
82
// Constructor
83
OrderedExecutorFactory(Executor parent);
84
85
// Factory methods
86
ArtemisExecutor getExecutor();
87
Executor getParent();
88
89
// Configuration
90
boolean isFair();
91
OrderedExecutorFactory setFair(boolean fair);
92
93
// Static utilities
94
static boolean flushExecutor(Executor executor);
95
static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit);
96
}
97
```
98
99
### Extended Executor Interface
100
101
Enhanced executor with lifecycle management and flow control.
102
103
#### ArtemisExecutor
104
105
Extended executor interface with shutdown control, fairness, and flow management.
106
107
```java { .api }
108
interface ArtemisExecutor extends Executor {
109
// Static factory
110
static ArtemisExecutor delegate(Executor executor);
111
112
// Shutdown control
113
default int shutdownNow(Consumer<? super Runnable> onPendingTask, int timeout, TimeUnit unit);
114
default int shutdownNow();
115
default void shutdown();
116
117
// Flow control
118
default boolean flush(long timeout, TimeUnit unit);
119
default boolean isFlushed();
120
default void yield();
121
122
// Fairness
123
default boolean isFair();
124
default ArtemisExecutor setFair(boolean fair);
125
126
// State checking
127
default boolean inHandler();
128
}
129
```
130
131
### Base Processing Infrastructure
132
133
Foundational classes providing common functionality for all actors and processors.
134
135
#### ProcessorBase
136
137
Abstract base class with state management, shutdown controls, and task queuing.
138
139
```java { .api }
140
abstract class ProcessorBase<T> extends HandlerBase {
141
// State constants
142
static final int STATE_NOT_RUNNING = 0;
143
static final int STATE_RUNNING = 1;
144
static final int STATE_FORCED_SHUTDOWN = 2;
145
static final int STATE_PAUSED = 3;
146
147
// Constructor
148
ProcessorBase(Executor parent);
149
150
// State management
151
void pauseProcessing();
152
void resumeProcessing();
153
int status();
154
boolean isFlushed();
155
156
// Shutdown methods
157
void shutdown();
158
void shutdown(long timeout, TimeUnit unit);
159
int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit);
160
161
// Flow control
162
void flush();
163
boolean flush(long timeout, TimeUnit unit);
164
void yield();
165
166
// Monitoring
167
int remaining();
168
169
// Abstract method for implementation
170
protected abstract void doTask(T task);
171
172
// Protected methods
173
protected void task(T command);
174
}
175
```
176
177
#### HandlerBase
178
179
Base class providing ThreadLocal-based handler detection.
180
181
```java { .api }
182
abstract class HandlerBase {
183
// Handler context detection
184
boolean inHandler();
185
186
// Protected context management
187
protected void enter();
188
protected void leave();
189
}
190
```
191
192
## Usage Examples
193
194
### Basic Actor Pattern
195
196
```java
197
import org.apache.activemq.artemis.utils.actors.Actor;
198
import org.apache.activemq.artemis.utils.actors.ActorListener;
199
200
// Create thread pool
201
ExecutorService threadPool = Executors.newFixedThreadPool(4);
202
203
// Create actor for processing messages
204
ActorListener<String> messageProcessor = message -> {
205
System.out.println("Processing: " + message);
206
// Perform message processing
207
processMessage(message);
208
};
209
210
Actor<String> messageActor = new Actor<>(threadPool, messageProcessor);
211
212
// Send messages - all processed in order
213
messageActor.act("Message 1");
214
messageActor.act("Message 2");
215
messageActor.act("Message 3");
216
217
// Graceful shutdown
218
messageActor.shutdown(30, TimeUnit.SECONDS);
219
```
220
221
### ThresholdActor for Flow Control
222
223
```java
224
import org.apache.activemq.artemis.utils.actors.ThresholdActor;
225
226
// Create threshold actor for memory management
227
ThresholdActor<ByteBuffer> bufferProcessor = new ThresholdActor<>(
228
threadPool,
229
buffer -> processBuffer(buffer), // Message processor
230
1024 * 1024, // 1MB threshold
231
ByteBuffer::remaining, // Size calculation
232
() -> { // Over threshold callback
233
logger.warn("Buffer threshold exceeded - applying backpressure");
234
applyBackpressure();
235
},
236
() -> { // Clear threshold callback
237
logger.info("Buffer threshold cleared - releasing backpressure");
238
releaseBackpressure();
239
}
240
);
241
242
// Process buffers with automatic threshold management
243
for (ByteBuffer buffer : incomingBuffers) {
244
bufferProcessor.act(buffer); // Threshold callbacks triggered automatically
245
}
246
247
// Flush and check final state
248
bufferProcessor.flush();
249
```
250
251
### Ordered Executor Pattern
252
253
```java
254
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
255
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
256
257
// Create factory for ordered executors
258
OrderedExecutorFactory factory = new OrderedExecutorFactory(threadPool)
259
.setFair(true); // Enable fairness - yields after each task
260
261
// Create ordered executors for different channels
262
OrderedExecutor channel1 = (OrderedExecutor) factory.getExecutor();
263
OrderedExecutor channel2 = (OrderedExecutor) factory.getExecutor();
264
265
// Tasks within each channel execute in order
266
channel1.execute(() -> processTask("Channel1-Task1"));
267
channel1.execute(() -> processTask("Channel1-Task2"));
268
channel1.execute(() -> processTask("Channel1-Task3"));
269
270
channel2.execute(() -> processTask("Channel2-Task1"));
271
channel2.execute(() -> processTask("Channel2-Task2"));
272
273
// Wait for completion
274
boolean flushed1 = channel1.flush(10, TimeUnit.SECONDS);
275
boolean flushed2 = channel2.flush(10, TimeUnit.SECONDS);
276
277
if (flushed1 && flushed2) {
278
System.out.println("All tasks completed");
279
}
280
```
281
282
### Advanced State Management
283
284
```java
285
import org.apache.activemq.artemis.utils.actors.ProcessorBase;
286
287
public class CustomProcessor extends ProcessorBase<WorkItem> {
288
289
public CustomProcessor(Executor parent) {
290
super(parent);
291
}
292
293
@Override
294
protected void doTask(WorkItem item) {
295
// Process work item
296
if (item.isHighPriority()) {
297
processImmediately(item);
298
} else {
299
processNormally(item);
300
}
301
}
302
303
public void processWork(WorkItem item) {
304
// Add to processing queue
305
task(item);
306
}
307
308
public void handleShutdown() {
309
// Pause processing
310
pauseProcessing();
311
312
// Process remaining items with timeout
313
int remaining = shutdownNow(
314
item -> handlePendingItem(item), // Handle each pending item
315
30, // 30 second timeout
316
TimeUnit.SECONDS
317
);
318
319
if (remaining > 0) {
320
logger.warn("Shutdown incomplete, {} items remaining", remaining);
321
}
322
}
323
324
public void checkHealth() {
325
int status = status();
326
int pending = remaining(); // O(n) operation
327
328
switch (status) {
329
case STATE_RUNNING:
330
logger.info("Processor running, {} pending items", pending);
331
break;
332
case STATE_PAUSED:
333
logger.info("Processor paused, {} pending items", pending);
334
break;
335
case STATE_FORCED_SHUTDOWN:
336
logger.warn("Processor force shutdown");
337
break;
338
}
339
}
340
}
341
```
342
343
### Flow Control Patterns
344
345
```java
346
// ArtemisExecutor with flow control
347
ArtemisExecutor executor = ArtemisExecutor.delegate(threadPool);
348
349
// Submit batch of tasks
350
for (int i = 0; i < 1000; i++) {
351
final int taskId = i;
352
executor.execute(() -> processTask(taskId));
353
354
// Yield periodically for fairness
355
if (i % 100 == 0) {
356
executor.yield();
357
}
358
}
359
360
// Wait for completion with timeout
361
boolean completed = executor.flush(60, TimeUnit.SECONDS);
362
363
if (!completed) {
364
// Force shutdown and handle pending tasks
365
int pending = executor.shutdownNow(
366
task -> logger.warn("Discarding pending task: {}", task),
367
10,
368
TimeUnit.SECONDS
369
);
370
logger.warn("Forced shutdown, {} tasks not completed", pending);
371
}
372
373
// Quick flush check
374
if (executor.isFlushed()) {
375
logger.info("Executor is completely flushed");
376
}
377
```
378
379
### Factory Pattern with Configuration
380
381
```java
382
// Create factory with shared thread pool
383
OrderedExecutorFactory factory = new OrderedExecutorFactory(
384
Executors.newFixedThreadPool(8,
385
new ActiveMQThreadFactory("ProcessorPool", true, getClassLoader()))
386
);
387
388
// Configure fairness for all executors
389
factory.setFair(true);
390
391
// Create executors for different subsystems
392
ArtemisExecutor messageProcessor = factory.getExecutor();
393
ArtemisExecutor connectionManager = factory.getExecutor();
394
ArtemisExecutor auditLogger = factory.getExecutor();
395
396
// Each subsystem gets ordered execution but can run in parallel
397
messageProcessor.execute(() -> handleIncomingMessage());
398
connectionManager.execute(() -> manageConnections());
399
auditLogger.execute(() -> logAuditEvent());
400
401
// Factory utilities for bulk operations
402
boolean allFlushed = OrderedExecutorFactory.flushExecutor(
403
factory.getParent(), 30, TimeUnit.SECONDS
404
);
405
```
406
407
## Design Patterns
408
409
### Message Processing Pipeline
410
411
```java
412
// Create pipeline with actors
413
Actor<RawMessage> parser = new Actor<>(threadPool, this::parseMessage);
414
Actor<ParsedMessage> validator = new Actor<>(threadPool, this::validateMessage);
415
Actor<ValidMessage> processor = new Actor<>(threadPool, this::processMessage);
416
417
// Chain processing
418
public void parseMessage(RawMessage raw) {
419
ParsedMessage parsed = parse(raw);
420
validator.act(parsed);
421
}
422
423
public void validateMessage(ParsedMessage parsed) {
424
if (isValid(parsed)) {
425
ValidMessage valid = new ValidMessage(parsed);
426
processor.act(valid);
427
}
428
}
429
```
430
431
### Backpressure Management
432
433
```java
434
// Coordinated backpressure across multiple threshold actors
435
AtomicBoolean backpressureActive = new AtomicBoolean(false);
436
437
ThresholdActor<Message> messageActor = new ThresholdActor<>(
438
threadPool, this::processMessage,
439
1000, Message::getSize,
440
() -> activateBackpressure(),
441
() -> releaseBackpressure()
442
);
443
444
ThresholdActor<Connection> connectionActor = new ThresholdActor<>(
445
threadPool, this::manageConnection,
446
50, conn -> 1, // Count-based threshold
447
() -> activateBackpressure(),
448
() -> releaseBackpressure()
449
);
450
451
private void activateBackpressure() {
452
if (backpressureActive.compareAndSet(false, true)) {
453
logger.warn("Activating system-wide backpressure");
454
notifyBackpressureListeners(true);
455
}
456
}
457
```
458
459
The actor framework provides a robust foundation for building scalable, ordered, asynchronous processing systems with sophisticated flow control and state management capabilities.