Common utilities, API classes, and shared functionality for Apache ActiveMQ Artemis message broker
—
Apache ActiveMQ Artemis Commons provides specialized collection implementations optimized for high-performance messaging workloads. These collections focus on concurrency, memory efficiency, and type safety for primitive types.
High-performance concurrent collections that avoid locking overhead and support lock-free operations.
Thread-safe hash set implementation with the same concurrency characteristics as ConcurrentHashMap.
class ConcurrentHashSet<E> extends AbstractSet<E> implements ConcurrentSet<E> {
// Constructor
ConcurrentHashSet();
// Set operations
boolean add(E o);
boolean contains(Object o);
boolean remove(Object o);
void clear();
// Concurrent-specific operations
boolean addIfAbsent(E o);
// Size and state
int size();
boolean isEmpty();
Iterator<E> iterator();
}
interface ConcurrentSet<E> extends Set<E> {
boolean addIfAbsent(E o);
}High-performance concurrent map from primitive long keys to Object values, avoiding boxing/unboxing overhead.
class ConcurrentLongHashMap<V> {
// Constructors
ConcurrentLongHashMap();
ConcurrentLongHashMap(int expectedItems);
ConcurrentLongHashMap(int expectedItems, int numSections);
// Map operations
V get(long key);
V put(long key, V value);
V putIfAbsent(long key, V value);
V computeIfAbsent(long key, LongFunction<V> provider);
V remove(long key);
boolean remove(long key, Object value);
// Query operations
boolean containsKey(long key);
int size();
long capacity();
boolean isEmpty();
void clear();
// Iteration
void forEach(EntryProcessor<V> processor);
List<Long> keys();
ConcurrentLongHashSet keysLongHashSet();
List<V> values();
// Functional interface for iteration
interface EntryProcessor<V> {
void accept(long key, V value);
}
}Concurrent hash set for primitive longs with no boxing overhead. Items must be >= 0.
class ConcurrentLongHashSet {
// Constructors
ConcurrentLongHashSet();
ConcurrentLongHashSet(int expectedItems);
ConcurrentLongHashSet(int expectedItems, int numSections);
// Set operations
boolean add(long item);
boolean contains(long item);
boolean remove(long item);
void clear();
// Query operations
int size();
long capacity();
boolean isEmpty();
// Iteration
void forEach(ConsumerLong processor);
Set<Long> items();
// Functional interface for iteration
interface ConsumerLong {
void accept(long item);
}
}Custom linked list implementations supporting multiple concurrent iterators and efficient add/remove operations.
Linked list supporting multiple concurrent iterators with direct element manipulation capabilities.
class LinkedListImpl<E> implements LinkedList<E> {
// Constructors
LinkedListImpl();
LinkedListImpl(Comparator<E> comparator);
LinkedListImpl(Comparator<E> comparator, NodeStore<E> supplier);
// Add operations
void addHead(E e);
void addTail(E e);
void addSorted(E e); // Requires comparator
// Access operations
E get(int position);
E poll();
E peek();
// Iterator operations
LinkedListIterator<E> iterator();
int numIters();
// Management operations
void clear();
int size();
void clearID();
void setNodeStore(NodeStore<E> store);
E removeWithID(String listID, long id);
void forEach(Consumer<E> consumer);
// Inner node class
static class Node<E> {
protected E val();
protected Node<E> next();
protected Node<E> prev();
static <E> Node<E> with(E o);
}
// Enhanced iterator
class Iterator implements LinkedListIterator<E> {
void repeat();
boolean hasNext();
E next();
void remove();
E removeLastElement();
void close();
}
}
interface LinkedList<E> {
void addHead(E e);
void addTail(E e);
E get(int position);
E poll();
E peek();
LinkedListIterator<E> iterator();
void clear();
int size();
void clearID();
void setNodeStore(NodeStore<E> store);
E removeWithID(String listID, long id);
void forEach(Consumer<E> consumer);
}
interface LinkedListIterator<E> extends Iterator<E>, AutoCloseable {
void repeat();
E removeLastElement();
void close();
}Advanced property system with JMS-compliant type conversions and thread-safe operations.
Property storage with type-safe conversions following JMS specification section 3.5.4.
class TypedProperties {
// Constructors
TypedProperties();
TypedProperties(Predicate<SimpleString> internalPropertyPredicate);
TypedProperties(Predicate<SimpleString> internalPropertyPredicate,
Predicate<SimpleString> amqpPropertyPredicate);
TypedProperties(TypedProperties other); // Copy constructor
// Property setters
void putBooleanProperty(SimpleString key, boolean value);
void putByteProperty(SimpleString key, byte value);
void putBytesProperty(SimpleString key, byte[] value);
void putShortProperty(SimpleString key, short value);
void putIntProperty(SimpleString key, int value);
void putLongProperty(SimpleString key, long value);
void putFloatProperty(SimpleString key, float value);
void putDoubleProperty(SimpleString key, double value);
void putSimpleStringProperty(SimpleString key, SimpleString value);
void putCharProperty(SimpleString key, char value);
void putNullValue(SimpleString key);
TypedProperties putProperty(SimpleString key, Object value);
void putTypedProperties(TypedProperties otherProps);
// Property getters with type conversion
Object getProperty(SimpleString key);
Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Byte getByteProperty(SimpleString key, Supplier<Byte> defaultValue)
throws ActiveMQPropertyConversionException;
Character getCharProperty(SimpleString key) throws ActiveMQPropertyConversionException;
byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
SimpleString getSimpleStringProperty(SimpleString key)
throws ActiveMQPropertyConversionException;
// Property management
Object removeProperty(SimpleString key);
boolean containsProperty(SimpleString key);
Set<SimpleString> getPropertyNames();
boolean clearInternalProperties();
boolean clearAMQPProperties();
// Size and state
int size();
int getMemoryOffset();
boolean isEmpty();
void clear();
// Iteration
void forEachKey(Consumer<SimpleString> action);
void forEach(BiConsumer<SimpleString, Object> action);
// Encoding/Decoding for serialization
void decode(ByteBuf buffer, TypedPropertiesDecoderPools keyValuePools);
void decode(ByteBuf buffer);
int encode(ByteBuf buffer);
int getEncodeSize();
// Thread safety
Lock getReadLock();
Lock getWriteLock();
// Utility methods
Set<String> getMapNames();
Map<String, Object> getMap();
// Static utility methods
static boolean searchProperty(SimpleString key, ByteBuf buffer, int startIndex);
static void setObjectProperty(SimpleString key, Object value, TypedProperties properties);
}Collections with automatic size management and memory-efficient implementations.
LinkedHashMap that automatically removes eldest entries when size exceeds maximum.
class MaxSizeMap<K, V> extends LinkedHashMap<K, V> {
// Constructor
MaxSizeMap(int maxSize);
// Inherits all LinkedHashMap methods
// Automatically removes eldest entries when maxSize is exceeded
}import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashSet;
// Create concurrent map for message IDs to messages
ConcurrentLongHashMap<Message> messages = new ConcurrentLongHashMap<>(1000);
// Store messages by ID
long messageId = 12345L;
messages.put(messageId, message);
// Retrieve with no boxing overhead
Message retrievedMessage = messages.get(messageId);
// Atomic operations
Message existingMessage = messages.putIfAbsent(messageId, message);
// Compute if absent pattern
Message computedMessage = messages.computeIfAbsent(messageId, id -> createMessage(id));
// Iterate over entries
messages.forEach((id, msg) -> {
if (msg.isExpired()) {
messages.remove(id);
}
});
// Track processed message IDs
ConcurrentLongHashSet processedIds = new ConcurrentLongHashSet(500);
processedIds.add(messageId);
if (processedIds.contains(messageId)) {
// Skip duplicate processing
}
// Batch processing
processedIds.forEach(id -> {
// Process each ID
processMessage(id);
});import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
// Create priority-ordered list
LinkedListImpl<Task> taskQueue = new LinkedListImpl<>(
Comparator.comparing(Task::getPriority).reversed()
);
// Add tasks
taskQueue.addSorted(new Task("high-priority", 10));
taskQueue.addTail(new Task("normal", 5));
taskQueue.addHead(new Task("urgent", 15));
// Process tasks in order
while (!taskQueue.isEmpty()) {
Task task = taskQueue.poll();
processTask(task);
}
// Use iterator for complex processing
try (LinkedListIterator<Task> iter = taskQueue.iterator()) {
while (iter.hasNext()) {
Task task = iter.next();
if (task.shouldBeRemoved()) {
iter.remove();
} else if (task.needsReprocessing()) {
iter.repeat(); // Process same element again
}
}
}import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.artemis.api.core.SimpleString;
// Create message properties
TypedProperties props = new TypedProperties();
// Set various property types
props.putStringProperty(SimpleString.of("userId"), "john.doe");
props.putIntProperty(SimpleString.of("priority"), 5);
props.putBooleanProperty(SimpleString.of("persistent"), true);
props.putLongProperty(SimpleString.of("timestamp"), System.currentTimeMillis());
// Type-safe retrieval with automatic conversion
try {
String userId = props.getSimpleStringProperty(SimpleString.of("userId")).toString();
Integer priority = props.getIntProperty(SimpleString.of("priority"));
Boolean persistent = props.getBooleanProperty(SimpleString.of("persistent"));
// JMS-compliant type conversions
String priorityAsString = props.getSimpleStringProperty(SimpleString.of("priority")).toString();
} catch (ActiveMQPropertyConversionException e) {
// Handle type conversion errors
logger.error("Property type conversion failed", e);
}
// Copy properties
TypedProperties copy = new TypedProperties(props);
// Merge properties
copy.putTypedProperties(additionalProps);
// Thread-safe operations
Lock readLock = props.getReadLock();
readLock.lock();
try {
// Read operations
Set<SimpleString> names = props.getPropertyNames();
} finally {
readLock.unlock();
}import org.apache.activemq.artemis.utils.collections.MaxSizeMap;
// Create cache with automatic eviction
MaxSizeMap<String, CachedData> cache = new MaxSizeMap<>(1000);
// Add entries - oldest will be removed when size exceeds 1000
cache.put("key1", data1);
cache.put("key2", data2);
// Use as normal Map
CachedData retrieved = cache.get("key1");
boolean exists = cache.containsKey("key1");
// Cache automatically manages size
for (int i = 0; i < 2000; i++) {
cache.put("key" + i, createData(i));
}
// Cache size will never exceed 1000 entriesThese collections are optimized for ActiveMQ Artemis's messaging workloads where performance, memory efficiency, and thread safety are critical requirements.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-activemq--artemis-commons