CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-activemq--artemis-commons

Common utilities, API classes, and shared functionality for Apache ActiveMQ Artemis message broker

Pending
Overview
Eval results
Files

actors.mddocs/

Actor Framework

Apache ActiveMQ Artemis Commons provides a powerful actor framework for asynchronous message processing with guaranteed ordering and built-in flow control. The framework ensures thread-safe, ordered execution of tasks while providing sophisticated state management and backpressure capabilities.

Capabilities

Core Actor System

The foundation of the actor framework providing ordered message processing.

Actor

Primary actor implementation for processing messages of type T with guaranteed ordering.

class Actor<T> extends ProcessorBase<T> {
    // Constructor
    Actor(Executor parent, ActorListener<T> listener);
    
    // Message processing
    void act(T message);
}

interface ActorListener<T> {
    void onMessage(T message);
}

ThresholdActor

Actor with threshold-based flow control for backpressure management and resource monitoring.

class ThresholdActor<T> extends ProcessorBase<Object> {
    // Constructor
    ThresholdActor(Executor parent, 
                   ActorListener<T> listener, 
                   int maxSize, 
                   ToIntFunction<T> sizeGetter, 
                   Runnable overThreshold, 
                   Runnable clearThreshold);
    
    // Message processing with threshold management
    void act(T message);
    
    // Flow control
    void flush();
    void requestShutdown();
}

Ordered Execution

Executors that guarantee strict task ordering while leveraging thread pools.

OrderedExecutor

Executor ensuring all tasks execute in strict order with optional fairness control.

class OrderedExecutor extends ProcessorBase<Runnable> implements ArtemisExecutor {
    // Constructor
    OrderedExecutor(Executor delegate);
    
    // Task execution
    void execute(Runnable run);
    
    // Fairness control
    boolean isFair();
    OrderedExecutor setFair(boolean fair);
    
    // String representation
    String toString();
}

OrderedExecutorFactory

Factory for creating OrderedExecutor instances with shared configuration.

class OrderedExecutorFactory implements ExecutorFactory {
    // Constructor
    OrderedExecutorFactory(Executor parent);
    
    // Factory methods
    ArtemisExecutor getExecutor();
    Executor getParent();
    
    // Configuration
    boolean isFair();
    OrderedExecutorFactory setFair(boolean fair);
    
    // Static utilities
    static boolean flushExecutor(Executor executor);
    static boolean flushExecutor(Executor executor, long timeout, TimeUnit unit);
}

Extended Executor Interface

Enhanced executor with lifecycle management and flow control.

ArtemisExecutor

Extended executor interface with shutdown control, fairness, and flow management.

interface ArtemisExecutor extends Executor {
    // Static factory
    static ArtemisExecutor delegate(Executor executor);
    
    // Shutdown control
    default int shutdownNow(Consumer<? super Runnable> onPendingTask, int timeout, TimeUnit unit);
    default int shutdownNow();
    default void shutdown();
    
    // Flow control
    default boolean flush(long timeout, TimeUnit unit);
    default boolean isFlushed();
    default void yield();
    
    // Fairness
    default boolean isFair();
    default ArtemisExecutor setFair(boolean fair);
    
    // State checking
    default boolean inHandler();
}

Base Processing Infrastructure

Foundational classes providing common functionality for all actors and processors.

ProcessorBase

Abstract base class with state management, shutdown controls, and task queuing.

abstract class ProcessorBase<T> extends HandlerBase {
    // State constants
    static final int STATE_NOT_RUNNING = 0;
    static final int STATE_RUNNING = 1;
    static final int STATE_FORCED_SHUTDOWN = 2;
    static final int STATE_PAUSED = 3;
    
    // Constructor
    ProcessorBase(Executor parent);
    
    // State management
    void pauseProcessing();
    void resumeProcessing();
    int status();
    boolean isFlushed();
    
    // Shutdown methods
    void shutdown();
    void shutdown(long timeout, TimeUnit unit);
    int shutdownNow(Consumer<? super T> onPendingItem, int timeout, TimeUnit unit);
    
    // Flow control
    void flush();
    boolean flush(long timeout, TimeUnit unit);
    void yield();
    
    // Monitoring
    int remaining();
    
    // Abstract method for implementation
    protected abstract void doTask(T task);
    
    // Protected methods
    protected void task(T command);
}

HandlerBase

Base class providing ThreadLocal-based handler detection.

abstract class HandlerBase {
    // Handler context detection
    boolean inHandler();
    
    // Protected context management
    protected void enter();
    protected void leave();
}

Usage Examples

Basic Actor Pattern

import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.actors.ActorListener;

// Create thread pool
ExecutorService threadPool = Executors.newFixedThreadPool(4);

// Create actor for processing messages
ActorListener<String> messageProcessor = message -> {
    System.out.println("Processing: " + message);
    // Perform message processing
    processMessage(message);
};

Actor<String> messageActor = new Actor<>(threadPool, messageProcessor);

// Send messages - all processed in order
messageActor.act("Message 1");
messageActor.act("Message 2");
messageActor.act("Message 3");

// Graceful shutdown
messageActor.shutdown(30, TimeUnit.SECONDS);

ThresholdActor for Flow Control

import org.apache.activemq.artemis.utils.actors.ThresholdActor;

// Create threshold actor for memory management
ThresholdActor<ByteBuffer> bufferProcessor = new ThresholdActor<>(
    threadPool,
    buffer -> processBuffer(buffer),          // Message processor
    1024 * 1024,                              // 1MB threshold
    ByteBuffer::remaining,                    // Size calculation
    () -> {                                   // Over threshold callback
        logger.warn("Buffer threshold exceeded - applying backpressure");
        applyBackpressure();
    },
    () -> {                                   // Clear threshold callback  
        logger.info("Buffer threshold cleared - releasing backpressure");
        releaseBackpressure();
    }
);

// Process buffers with automatic threshold management
for (ByteBuffer buffer : incomingBuffers) {
    bufferProcessor.act(buffer); // Threshold callbacks triggered automatically
}

// Flush and check final state
bufferProcessor.flush();

Ordered Executor Pattern

import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;

// Create factory for ordered executors
OrderedExecutorFactory factory = new OrderedExecutorFactory(threadPool)
    .setFair(true); // Enable fairness - yields after each task

// Create ordered executors for different channels
OrderedExecutor channel1 = (OrderedExecutor) factory.getExecutor();
OrderedExecutor channel2 = (OrderedExecutor) factory.getExecutor();

// Tasks within each channel execute in order
channel1.execute(() -> processTask("Channel1-Task1"));
channel1.execute(() -> processTask("Channel1-Task2"));
channel1.execute(() -> processTask("Channel1-Task3"));

channel2.execute(() -> processTask("Channel2-Task1"));
channel2.execute(() -> processTask("Channel2-Task2"));

// Wait for completion
boolean flushed1 = channel1.flush(10, TimeUnit.SECONDS);
boolean flushed2 = channel2.flush(10, TimeUnit.SECONDS);

if (flushed1 && flushed2) {
    System.out.println("All tasks completed");
}

Advanced State Management

import org.apache.activemq.artemis.utils.actors.ProcessorBase;

public class CustomProcessor extends ProcessorBase<WorkItem> {
    
    public CustomProcessor(Executor parent) {
        super(parent);
    }
    
    @Override
    protected void doTask(WorkItem item) {
        // Process work item
        if (item.isHighPriority()) {
            processImmediately(item);
        } else {
            processNormally(item);
        }
    }
    
    public void processWork(WorkItem item) {
        // Add to processing queue
        task(item);
    }
    
    public void handleShutdown() {
        // Pause processing
        pauseProcessing();
        
        // Process remaining items with timeout
        int remaining = shutdownNow(
            item -> handlePendingItem(item),  // Handle each pending item
            30,                               // 30 second timeout
            TimeUnit.SECONDS
        );
        
        if (remaining > 0) {
            logger.warn("Shutdown incomplete, {} items remaining", remaining);
        }
    }
    
    public void checkHealth() {
        int status = status();
        int pending = remaining(); // O(n) operation
        
        switch (status) {
            case STATE_RUNNING:
                logger.info("Processor running, {} pending items", pending);
                break;
            case STATE_PAUSED:
                logger.info("Processor paused, {} pending items", pending);
                break;
            case STATE_FORCED_SHUTDOWN:
                logger.warn("Processor force shutdown");
                break;
        }
    }
}

Flow Control Patterns

// ArtemisExecutor with flow control
ArtemisExecutor executor = ArtemisExecutor.delegate(threadPool);

// Submit batch of tasks
for (int i = 0; i < 1000; i++) {
    final int taskId = i;
    executor.execute(() -> processTask(taskId));
    
    // Yield periodically for fairness
    if (i % 100 == 0) {
        executor.yield();
    }
}

// Wait for completion with timeout
boolean completed = executor.flush(60, TimeUnit.SECONDS);

if (!completed) {
    // Force shutdown and handle pending tasks
    int pending = executor.shutdownNow(
        task -> logger.warn("Discarding pending task: {}", task),
        10, 
        TimeUnit.SECONDS
    );
    logger.warn("Forced shutdown, {} tasks not completed", pending);
}

// Quick flush check
if (executor.isFlushed()) {
    logger.info("Executor is completely flushed");
}

Factory Pattern with Configuration

// Create factory with shared thread pool
OrderedExecutorFactory factory = new OrderedExecutorFactory(
    Executors.newFixedThreadPool(8, 
        new ActiveMQThreadFactory("ProcessorPool", true, getClassLoader()))
);

// Configure fairness for all executors
factory.setFair(true);

// Create executors for different subsystems
ArtemisExecutor messageProcessor = factory.getExecutor();
ArtemisExecutor connectionManager = factory.getExecutor();
ArtemisExecutor auditLogger = factory.getExecutor();

// Each subsystem gets ordered execution but can run in parallel
messageProcessor.execute(() -> handleIncomingMessage());
connectionManager.execute(() -> manageConnections());
auditLogger.execute(() -> logAuditEvent());

// Factory utilities for bulk operations
boolean allFlushed = OrderedExecutorFactory.flushExecutor(
    factory.getParent(), 30, TimeUnit.SECONDS
);

Design Patterns

Message Processing Pipeline

// Create pipeline with actors
Actor<RawMessage> parser = new Actor<>(threadPool, this::parseMessage);
Actor<ParsedMessage> validator = new Actor<>(threadPool, this::validateMessage);
Actor<ValidMessage> processor = new Actor<>(threadPool, this::processMessage);

// Chain processing
public void parseMessage(RawMessage raw) {
    ParsedMessage parsed = parse(raw);
    validator.act(parsed);
}

public void validateMessage(ParsedMessage parsed) {
    if (isValid(parsed)) {
        ValidMessage valid = new ValidMessage(parsed);
        processor.act(valid);
    }
}

Backpressure Management

// Coordinated backpressure across multiple threshold actors
AtomicBoolean backpressureActive = new AtomicBoolean(false);

ThresholdActor<Message> messageActor = new ThresholdActor<>(
    threadPool, this::processMessage, 
    1000, Message::getSize,
    () -> activateBackpressure(),
    () -> releaseBackpressure()
);

ThresholdActor<Connection> connectionActor = new ThresholdActor<>(
    threadPool, this::manageConnection,
    50, conn -> 1,  // Count-based threshold
    () -> activateBackpressure(),
    () -> releaseBackpressure()
);

private void activateBackpressure() {
    if (backpressureActive.compareAndSet(false, true)) {
        logger.warn("Activating system-wide backpressure");
        notifyBackpressureListeners(true);
    }
}

The actor framework provides a robust foundation for building scalable, ordered, asynchronous processing systems with sophisticated flow control and state management capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-activemq--artemis-commons

docs

actors.md

collections.md

core-api.md

exceptions.md

index.md

json-api.md

logging.md

utilities.md

tile.json