Common utilities, API classes, and shared functionality for Apache ActiveMQ Artemis message broker
—
Apache ActiveMQ Artemis Commons provides a comprehensive set of utility classes for threading, pooling, validation, environment access, and general-purpose operations used throughout the Artemis ecosystem.
Custom thread factory for creating properly named and configured threads.
class ActiveMQThreadFactory implements ThreadFactory {
// Constructors
ActiveMQThreadFactory(String groupName, boolean daemon, ClassLoader tccl);
ActiveMQThreadFactory(String groupName, String prefix, boolean daemon, ClassLoader tccl);
// ThreadFactory implementation
Thread newThread(Runnable command);
// Thread management
boolean join(int timeout, TimeUnit timeUnit);
// Factory method
static ActiveMQThreadFactory defaultThreadFactory(String callerClassName);
}// Create thread factory for background tasks
ActiveMQThreadFactory factory = new ActiveMQThreadFactory(
"MessageProcessor", // group name
true, // daemon threads
getClass().getClassLoader()
);
// Create threads with proper naming
Thread worker = factory.newThread(() -> {
// Background message processing
processMessages();
});
// Use default factory
ActiveMQThreadFactory defaultFactory =
ActiveMQThreadFactory.defaultThreadFactory("MyComponent");
// Wait for all threads to finish
boolean finished = factory.join(30, TimeUnit.SECONDS);Enhanced thread pool executor with monitoring capabilities.
class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
// Enhanced ThreadPoolExecutor for ActiveMQ with monitoring
// (Specific constructor signatures depend on implementation)
}interface ExecutorFactory {
// Factory interface for creating executors
Executor getExecutor();
}Utilities for byte array manipulation and conversion.
class ByteUtil {
// Hex string conversion
static byte[] hexStringToByteArray(String hexString);
static String bytesToHex(byte[] bytes);
// Additional byte manipulation methods
// (Specific methods depend on implementation)
}// Convert hex string to bytes
byte[] data = ByteUtil.hexStringToByteArray("48656C6C6F"); // "Hello"
// Convert bytes back to hex
String hex = ByteUtil.bytesToHex(data); // "48656C6C6F"
// Useful for debugging binary data
String debugHex = ByteUtil.bytesToHex(messageBytes);
System.out.println("Message bytes: " + debugHex);class BooleanUtil {
// Boolean conversion and utility methods
// (Specific methods depend on implementation)
}Base64 encoding and decoding utilities.
class Base64 {
// Base64 encoding/decoding operations
static String encode(byte[] data);
static byte[] decode(String encoded);
// Additional Base64 methods
}Utilities for working with ActiveMQ addresses and destination names.
class CompositeAddress {
// Utilities for fully-qualified queue names (address::queue format)
static String extractQueueName(String compositeAddress);
static String extractAddressName(String compositeAddress);
static String toFullyQualified(String address, String queue);
// Additional composite address methods
}
class DestinationUtil {
// Destination name manipulation utilities
// (Specific methods depend on implementation)
}// Work with composite addresses
String fullAddress = "orders::priority-queue";
String address = CompositeAddress.extractAddressName(fullAddress); // "orders"
String queue = CompositeAddress.extractQueueName(fullAddress); // "priority-queue"
// Create composite address
String composite = CompositeAddress.toFullyQualified("notifications", "email-queue");
// Result: "notifications::email-queue"class DataConstants {
// Size constants for different data types
static final int SIZE_BOOLEAN = 1;
static final int SIZE_BYTE = 1;
static final int SIZE_SHORT = 2;
static final int SIZE_INT = 4;
static final int SIZE_LONG = 8;
static final int SIZE_FLOAT = 4;
static final int SIZE_DOUBLE = 8;
// Additional data type markers and constants
}abstract class AbstractPool<K, V> {
// Base class for object pooling implementations
abstract V get(K key);
abstract void put(K key, V value);
abstract void clear();
}
abstract class AbstractByteBufPool<T> extends AbstractPool<ByteBuf, T> {
// Base class for ByteBuf-based object pools
}interface Pool<T> {
// Generic pool interface
T get();
void put(T item);
void clear();
int size();
}
class MpscPool<T> implements Pool<T> {
// Multi-producer, single-consumer pool implementation
MpscPool(int capacity, Supplier<T> supplier);
T get();
void put(T item);
void clear();
int size();
}// Create a pool for expensive objects
MpscPool<StringBuilder> stringBuilderPool = new MpscPool<>(
10, // capacity
() -> new StringBuilder(256) // supplier
);
// Use pooled objects
StringBuilder sb = stringBuilderPool.get();
try {
sb.append("Message: ").append(data);
String result = sb.toString();
// Use result
} finally {
sb.setLength(0); // reset
stringBuilderPool.put(sb); // return to pool
}interface ReferenceCounter {
// Reference counting for resource management
int increment();
int decrement();
int getCount();
}interface ArtemisCloseable extends AutoCloseable {
// Enhanced closeable interface for Artemis resources
void close() throws ActiveMQException;
boolean isClosed();
}// Resource with reference counting
class ManagedResource implements ReferenceCounter, ArtemisCloseable {
private final ReferenceCounter refCounter = /* implementation */;
public void use() {
refCounter.increment();
try {
// Use resource
} finally {
if (refCounter.decrement() == 0) {
// Last reference, cleanup
cleanup();
}
}
}
}
// Try-with-resources pattern
try (ArtemisCloseable resource = createManagedResource()) {
// Use resource
resource.performOperation();
} // Automatically closedclass UUIDTimer {
// Timer utilities for UUID generation
// (Specific methods depend on implementation)
}class AutomaticLatch {
// Self-managing latch implementation
AutomaticLatch(int count);
void await() throws InterruptedException;
void await(long timeout, TimeUnit unit) throws InterruptedException;
void countDown();
int getCount();
}
class ReusableLatch {
// Reusable countdown latch implementation
ReusableLatch(int initialCount);
void await() throws InterruptedException;
void await(long timeout, TimeUnit unit) throws InterruptedException;
void countDown();
void reset(int count);
int getCount();
}// Coordinate multiple operations
AutomaticLatch completionLatch = new AutomaticLatch(3);
// Start three async operations
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
try {
performOperation();
} finally {
completionLatch.countDown();
}
});
}
// Wait for all operations to complete
completionLatch.await(30, TimeUnit.SECONDS);
// Reusable latch for repeated coordination
ReusableLatch batchLatch = new ReusableLatch(10);
while (hasMoreBatches()) {
processBatch();
batchLatch.countDown();
if (batchLatch.getCount() == 0) {
// Batch complete, reset for next batch
batchLatch.reset(10);
}
}class Preconditions {
// Validation utilities for checking preconditions
static void checkArgument(boolean condition, String message);
static void checkNotNull(Object reference, String message);
static void checkState(boolean condition, String message);
// Additional validation methods
static void checkElementIndex(int index, int size, String message);
static void checkPositionIndex(int index, int size, String message);
}public void processQueue(String queueName, int maxMessages) {
Preconditions.checkNotNull(queueName, "Queue name cannot be null");
Preconditions.checkArgument(maxMessages > 0, "Max messages must be positive");
Preconditions.checkState(isConnected(), "Must be connected to process queue");
// Proceed with processing
doProcessQueue(queueName, maxMessages);
}
public Object getElement(List<Object> list, int index) {
Preconditions.checkNotNull(list, "List cannot be null");
Preconditions.checkElementIndex(index, list.size(), "Invalid index");
return list.get(index);
}class Env {
// Environment and system property utilities
static String getProperty(String key);
static String getProperty(String key, String defaultValue);
static boolean getBooleanProperty(String key, boolean defaultValue);
static int getIntProperty(String key, int defaultValue);
static long getLongProperty(String key, long defaultValue);
// Environment variable access
static String getEnv(String name);
static String getEnv(String name, String defaultValue);
}// Read configuration from system properties
int poolSize = Env.getIntProperty("artemis.pool.size", 10);
boolean debugMode = Env.getBooleanProperty("artemis.debug", false);
String logLevel = Env.getProperty("artemis.log.level", "INFO");
// Read from environment variables
String dataDir = Env.getEnv("ARTEMIS_DATA_DIR", "/var/lib/artemis");
String configFile = Env.getEnv("ARTEMIS_CONFIG", "artemis.xml");
// Use configuration
if (debugMode) {
logger.info("Debug mode enabled, pool size: " + poolSize);
}class DefaultSensitiveStringCodec {
// Default implementation for encoding/decoding sensitive strings
String encode(String plainText) throws Exception;
String decode(String encodedText) throws Exception;
}class ClassloadingUtil {
// Class loading utilities
static Class<?> loadClass(String className, ClassLoader classLoader);
static <T> T newInstance(String className, ClassLoader classLoader, Class<T> type);
// Additional class loading methods
}class FileUtil {
// File manipulation utilities
static void deleteDirectory(File directory) throws IOException;
static void copyDirectory(File source, File target) throws IOException;
static boolean createDirectories(File directory);
static long getFileSize(File file);
static byte[] readFileBytes(File file) throws IOException;
static void writeFileBytes(File file, byte[] data) throws IOException;
// Additional file operations
}class TimeUtils {
// Time and duration utilities
static long currentTimeMillis();
static long nanoTime();
static String formatDuration(long durationMs);
static long parseDuration(String duration);
static boolean isExpired(long timestamp, long timeoutMs);
// Additional time operations
}class UTF8Util {
// UTF-8 encoding and decoding utilities
static byte[] encode(String string);
static String decode(byte[] bytes);
static int calculateEncodedLength(String string);
static boolean isValidUTF8(byte[] bytes);
// Additional UTF-8 methods
}class UUID {
// UUID representation and operations
UUID(long mostSigBits, long leastSigBits);
static UUID randomUUID();
static UUID fromString(String name);
long getMostSignificantBits();
long getLeastSignificantBits();
String toString();
}
class UUIDGenerator {
// UUID generation utilities
static UUID generateTimeBasedUUID();
static UUID generateRandomUUID();
static String generateUUIDString();
// Additional UUID generation methods
}class StringEscapeUtils {
// String escaping and unescaping utilities
static String escapeJava(String input);
static String unescapeJava(String input);
static String escapeXml(String input);
static String unescapeXml(String input);
static String escapeHtml(String input);
static String unescapeHtml(String input);
// Additional string escape methods
}class TypedProperties implements Map<SimpleString, Object> {
// Type-safe property storage
TypedProperties();
TypedProperties(TypedProperties other);
// Type-safe getters
String getStringProperty(SimpleString key);
Integer getIntProperty(SimpleString key);
Long getLongProperty(SimpleString key);
Boolean getBooleanProperty(SimpleString key);
Double getDoubleProperty(SimpleString key);
// Type-safe setters
TypedProperties setStringProperty(SimpleString key, String value);
TypedProperties setIntProperty(SimpleString key, int value);
TypedProperties setLongProperty(SimpleString key, long value);
TypedProperties setBooleanProperty(SimpleString key, boolean value);
TypedProperties setDoubleProperty(SimpleString key, double value);
// Property manipulation
boolean containsProperty(SimpleString key);
Object removeProperty(SimpleString key);
Set<SimpleString> getPropertyNames();
int getEncodeSize();
// Map interface methods inherited from Map<SimpleString, Object>
}class ConcurrentUtil {
// Concurrency utilities and helpers
static void parkNanos(long nanos);
static void yield();
static void onSpinWait();
// Additional concurrency helpers
}// Combine multiple utilities for robust resource management
public class ResourceManager implements ArtemisCloseable {
private final ActiveMQThreadFactory threadFactory;
private final ReferenceCounter refCounter;
private final ReusableLatch operationLatch;
public ResourceManager(String name) {
this.threadFactory = new ActiveMQThreadFactory(name, true, getClassLoader());
this.refCounter = createReferenceCounter();
this.operationLatch = new ReusableLatch(1);
Preconditions.checkNotNull(name, "Resource name cannot be null");
}
public void performOperation() throws ActiveMQException {
Preconditions.checkState(!isClosed(), "Resource is closed");
refCounter.increment();
try {
Thread worker = threadFactory.newThread(this::doWork);
worker.start();
operationLatch.await(30, TimeUnit.SECONDS);
} finally {
if (refCounter.decrement() == 0) {
cleanup();
}
}
}
}// Use environment utilities for flexible configuration
public class ConfigurableProcessor {
private final int batchSize;
private final long timeoutMs;
private final boolean useCompression;
private final String workDir;
public ConfigurableProcessor() {
// Load configuration with sensible defaults
this.batchSize = Env.getIntProperty("processor.batch.size", 100);
this.timeoutMs = Env.getLongProperty("processor.timeout.ms", 30000);
this.useCompression = Env.getBooleanProperty("processor.compression", false);
this.workDir = Env.getEnv("PROCESSOR_WORK_DIR", "/tmp/processor");
// Validate configuration
Preconditions.checkArgument(batchSize > 0, "Batch size must be positive");
Preconditions.checkArgument(timeoutMs > 0, "Timeout must be positive");
Preconditions.checkNotNull(workDir, "Work directory must be specified");
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-activemq--artemis-commons