Common utilities, API classes, and shared functionality for Apache ActiveMQ Artemis message broker
—
Apache ActiveMQ Artemis Commons provides a powerful actor framework for asynchronous message processing with guaranteed ordering and built-in flow control. The framework ensures thread-safe, ordered execution of tasks while providing sophisticated state management and backpressure capabilities.
The foundation of the actor framework providing ordered message processing.
Primary actor implementation for processing messages of type T with guaranteed ordering.
class Actor<T> extends ProcessorBase<T> {
// Constructor
Actor(Executor parent, ActorListener<T> listener);
// Message processing
void act(T message);
}
interface ActorListener<T> {
void onMessage(T message);
}Actor with threshold-based flow control for backpressure management and resource monitoring.
class ThresholdActor<T> extends ProcessorBase<Object> {
// Constructor
ThresholdActor(Executor parent,
ActorListener<T> listener,
int maxSize,
ToIntFunction<T> sizeGetter,
Runnable overThreshold,
Runnable clearThreshold);
// Message processing with threshold management
void act(T message);
// Flow control
void flush();
void requestShutdown();
}Executors that guarantee strict task ordering while leveraging thread pools.
Executor ensuring all tasks execute in strict order with optional fairness control.
class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
// Constructor
OrderedExecutor(Executor delegate);
// Task execution
void execute(Runnable run);
// Fairness control
boolean isFair();
OrderedExecutor setFair(boolean fair);
// String representation
String toString();
}Factory for creating OrderedExecutor instances with shared configuration.
class OrderedExecutorFactory implements ExecutorFactory {
// Constructor
OrderedExecutorFactory(Executor parent);
// Factory methods
ArtemisExecutor getExecutor();
Executor getParent();
// Configuration
boolean isFair();
OrderedExecutorFactory setFair(boolean fair);
// Static utilities
static boolean flushExecutor(Executor executor);
static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit);
}Enhanced executor with lifecycle management and flow control.
Extended executor interface with shutdown control, fairness, and flow management.
interface ArtemisExecutor extends Executor {
// Static factory
static ArtemisExecutor delegate(Executor executor);
// Shutdown control
default int shutdownNow(Consumer<? super Runnable> onPendingTask, int timeout, TimeUnit unit);
default int shutdownNow();
default void shutdown();
// Flow control
default boolean flush(long timeout, TimeUnit unit);
default boolean isFlushed();
default void yield();
// Fairness
default boolean isFair();
default ArtemisExecutor setFair(boolean fair);
// State checking
default boolean inHandler();
}Foundational classes providing common functionality for all actors and processors.
Abstract base class with state management, shutdown controls, and task queuing.
abstract class ProcessorBase<T> extends HandlerBase {
// State constants
static final int STATE_NOT_RUNNING = 0;
static final int STATE_RUNNING = 1;
static final int STATE_FORCED_SHUTDOWN = 2;
static final int STATE_PAUSED = 3;
// Constructor
ProcessorBase(Executor parent);
// State management
void pauseProcessing();
void resumeProcessing();
int status();
boolean isFlushed();
// Shutdown methods
void shutdown();
void shutdown(long timeout, TimeUnit unit);
int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit);
// Flow control
void flush();
boolean flush(long timeout, TimeUnit unit);
void yield();
// Monitoring
int remaining();
// Abstract method for implementation
protected abstract void doTask(T task);
// Protected methods
protected void task(T command);
}Base class providing ThreadLocal-based handler detection.
abstract class HandlerBase {
// Handler context detection
boolean inHandler();
// Protected context management
protected void enter();
protected void leave();
}import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.ActorListener;
// Create thread pool
ExecutorService threadPool = Executors.newFixedThreadPool(4);
// Create actor for processing messages
ActorListener<String> messageProcessor = message -> {
System.out.println("Processing: " + message);
// Perform message processing
processMessage(message);
};
Actor<String> messageActor = new Actor<>(threadPool, messageProcessor);
// Send messages - all processed in order
messageActor.act("Message 1");
messageActor.act("Message 2");
messageActor.act("Message 3");
// Graceful shutdown
messageActor.shutdown(30, TimeUnit.SECONDS);import org.apache.activemq.artemis.utils.actors.ThresholdActor;
// Create threshold actor for memory management
ThresholdActor<ByteBuffer> bufferProcessor = new ThresholdActor<>(
threadPool,
buffer -> processBuffer(buffer), // Message processor
1024 * 1024, // 1MB threshold
ByteBuffer::remaining, // Size calculation
() -> { // Over threshold callback
logger.warn("Buffer threshold exceeded - applying backpressure");
applyBackpressure();
},
() -> { // Clear threshold callback
logger.info("Buffer threshold cleared - releasing backpressure");
releaseBackpressure();
}
);
// Process buffers with automatic threshold management
for (ByteBuffer buffer : incomingBuffers) {
bufferProcessor.act(buffer); // Threshold callbacks triggered automatically
}
// Flush and check final state
bufferProcessor.flush();import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
// Create factory for ordered executors
OrderedExecutorFactory factory = new OrderedExecutorFactory(threadPool)
.setFair(true); // Enable fairness - yields after each task
// Create ordered executors for different channels
OrderedExecutor channel1 = (OrderedExecutor) factory.getExecutor();
OrderedExecutor channel2 = (OrderedExecutor) factory.getExecutor();
// Tasks within each channel execute in order
channel1.execute(() -> processTask("Channel1-Task1"));
channel1.execute(() -> processTask("Channel1-Task2"));
channel1.execute(() -> processTask("Channel1-Task3"));
channel2.execute(() -> processTask("Channel2-Task1"));
channel2.execute(() -> processTask("Channel2-Task2"));
// Wait for completion
boolean flushed1 = channel1.flush(10, TimeUnit.SECONDS);
boolean flushed2 = channel2.flush(10, TimeUnit.SECONDS);
if (flushed1 && flushed2) {
System.out.println("All tasks completed");
}import org.apache.activemq.artemis.utils.actors.ProcessorBase;
public class CustomProcessor extends ProcessorBase<WorkItem> {
public CustomProcessor(Executor parent) {
super(parent);
}
@Override
protected void doTask(WorkItem item) {
// Process work item
if (item.isHighPriority()) {
processImmediately(item);
} else {
processNormally(item);
}
}
public void processWork(WorkItem item) {
// Add to processing queue
task(item);
}
public void handleShutdown() {
// Pause processing
pauseProcessing();
// Process remaining items with timeout
int remaining = shutdownNow(
item -> handlePendingItem(item), // Handle each pending item
30, // 30 second timeout
TimeUnit.SECONDS
);
if (remaining > 0) {
logger.warn("Shutdown incomplete, {} items remaining", remaining);
}
}
public void checkHealth() {
int status = status();
int pending = remaining(); // O(n) operation
switch (status) {
case STATE_RUNNING:
logger.info("Processor running, {} pending items", pending);
break;
case STATE_PAUSED:
logger.info("Processor paused, {} pending items", pending);
break;
case STATE_FORCED_SHUTDOWN:
logger.warn("Processor force shutdown");
break;
}
}
}// ArtemisExecutor with flow control
ArtemisExecutor executor = ArtemisExecutor.delegate(threadPool);
// Submit batch of tasks
for (int i = 0; i < 1000; i++) {
final int taskId = i;
executor.execute(() -> processTask(taskId));
// Yield periodically for fairness
if (i % 100 == 0) {
executor.yield();
}
}
// Wait for completion with timeout
boolean completed = executor.flush(60, TimeUnit.SECONDS);
if (!completed) {
// Force shutdown and handle pending tasks
int pending = executor.shutdownNow(
task -> logger.warn("Discarding pending task: {}", task),
10,
TimeUnit.SECONDS
);
logger.warn("Forced shutdown, {} tasks not completed", pending);
}
// Quick flush check
if (executor.isFlushed()) {
logger.info("Executor is completely flushed");
}// Create factory with shared thread pool
OrderedExecutorFactory factory = new OrderedExecutorFactory(
Executors.newFixedThreadPool(8,
new ActiveMQThreadFactory("ProcessorPool", true, getClassLoader()))
);
// Configure fairness for all executors
factory.setFair(true);
// Create executors for different subsystems
ArtemisExecutor messageProcessor = factory.getExecutor();
ArtemisExecutor connectionManager = factory.getExecutor();
ArtemisExecutor auditLogger = factory.getExecutor();
// Each subsystem gets ordered execution but can run in parallel
messageProcessor.execute(() -> handleIncomingMessage());
connectionManager.execute(() -> manageConnections());
auditLogger.execute(() -> logAuditEvent());
// Factory utilities for bulk operations
boolean allFlushed = OrderedExecutorFactory.flushExecutor(
factory.getParent(), 30, TimeUnit.SECONDS
);// Create pipeline with actors
Actor<RawMessage> parser = new Actor<>(threadPool, this::parseMessage);
Actor<ParsedMessage> validator = new Actor<>(threadPool, this::validateMessage);
Actor<ValidMessage> processor = new Actor<>(threadPool, this::processMessage);
// Chain processing
public void parseMessage(RawMessage raw) {
ParsedMessage parsed = parse(raw);
validator.act(parsed);
}
public void validateMessage(ParsedMessage parsed) {
if (isValid(parsed)) {
ValidMessage valid = new ValidMessage(parsed);
processor.act(valid);
}
}// Coordinated backpressure across multiple threshold actors
AtomicBoolean backpressureActive = new AtomicBoolean(false);
ThresholdActor<Message> messageActor = new ThresholdActor<>(
threadPool, this::processMessage,
1000, Message::getSize,
() -> activateBackpressure(),
() -> releaseBackpressure()
);
ThresholdActor<Connection> connectionActor = new ThresholdActor<>(
threadPool, this::manageConnection,
50, conn -> 1, // Count-based threshold
() -> activateBackpressure(),
() -> releaseBackpressure()
);
private void activateBackpressure() {
if (backpressureActive.compareAndSet(false, true)) {
logger.warn("Activating system-wide backpressure");
notifyBackpressureListeners(true);
}
}The actor framework provides a robust foundation for building scalable, ordered, asynchronous processing systems with sophisticated flow control and state management capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-activemq--artemis-commons