序
本文主要研究一下artemis message的priority
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
public class CoreMessage extends RefCountMessage implements ICoreMessage { //...... protected byte priority; public byte getPriority() { return priority; } public CoreMessage setPriority(byte priority) { this.priority = priority; messageChanged(); return this; } //......}
CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法messageReferences.addactivemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
public class QueueImpl extends CriticalComponentImpl implements Queue { //...... private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator()); //...... private synchronized void internalAddTail(final MessageReference ref) { refAdded(ref); messageReferences.addTail(ref, getPriority(ref)); pendingMetrics.incrementMetrics(ref); enforceRing(false); } private void internalAddHead(final MessageReference ref) { queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); pendingMetrics.incrementMetrics(ref); refAdded(ref); int priority = getPriority(ref); messageReferences.addHead(ref, priority); ref.setInDelivery(false); } private void internalAddSorted(final MessageReference ref) { queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); pendingMetrics.incrementMetrics(ref); refAdded(ref); int priority = getPriority(ref); messageReferences.addSorted(ref, priority); } private int getPriority(MessageReference ref) { try { return ref.getMessage().getPriority(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.unableToGetMessagePriority(e); return 4; // the default one in case of failure } } //......}
QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列PriorityLinkedListactivemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
public interface PriorityLinkedList<T> { void addHead(T t, int priority); void addTail(T t, int priority); void addSorted(T t, int priority); T poll(); void clear(); /** * Returns the size of this list.<br> * It is safe to be called concurrently. */ int size(); LinkedListIterator<T> iterator(); /** * Returns {@code true} if empty, {@code false} otherwise.<br> * It is safe to be called concurrently. */ boolean isEmpty();}
PriorityLinkedList接口定义了根据priority的addHead、addTail、addSorted方法,其size以及isEmpty要求是线程安全的PriorityLinkedListImplactivemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> { private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size"); protected LinkedListImpl<T>[] levels; private volatile int size; private int lastReset; private int highestPriority = -1; private int lastPriority = -1; public PriorityLinkedListImpl(final int priorities) { this(priorities, null); } public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) { levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities); for (int i = 0; i < priorities; i++) { levels[i] = new LinkedListImpl<>(comparator); } } private void checkHighest(final int priority) { if (lastPriority != priority || priority > highestPriority) { lastPriority = priority; if (lastReset == Integer.MAX_VALUE) { lastReset = 0; } else { lastReset++; } } if (priority > highestPriority) { highestPriority = priority; } } @Override public void addHead(final T t, final int priority) { checkHighest(priority); levels[priority].addHead(t); exclusiveIncrementSize(1); } @Override public void addTail(final T t, final int priority) { checkHighest(priority); levels[priority].addTail(t); exclusiveIncrementSize(1); } @Override public void addSorted(T t, int priority) { checkHighest(priority); levels[priority].addSorted(t); exclusiveIncrementSize(1); } @Override public T poll() { T t = null; // We are just using a simple prioritization algorithm: // Highest priority refs always get returned first. // This could cause starvation of lower priority refs. // TODO - A better prioritization algorithm for (int i = highestPriority; i >= 0; i--) { LinkedListImpl<T> ll = levels[i]; if (ll.size() != 0) { t = ll.poll(); if (t != null) { exclusiveIncrementSize(-1); if (ll.size() == 0) { if (highestPriority == i) { highestPriority--; } } } break; } } return t; } @Override public void clear() { for (LinkedListImpl<T> list : levels) { list.clear(); } exclusiveSetSize(0); } private void exclusiveIncrementSize(int amount) { SIZE_UPDATER.lazySet(this, this.size + amount); } private void exclusiveSetSize(int value) { SIZE_UPDATER.lazySet(this, value); } @Override public int size() { return size; } @Override public boolean isEmpty() { return size == 0; } @Override public LinkedListIterator<T> iterator() { return new PriorityLinkedListIterator(); } //......}
PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;其addHead、addTail、addSorted先执行checkHighest(priority)维护highestPriority,之后调用对应priority的LinkedListImpl的addHead、addTail、addSorted方法,最后调用exclusiveIncrementSize方法递增size;其poll方法会从highestPriority的LinkedListImpl开始poll小结CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法;QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列;PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;PriorityLinkedListImpl的addHead、addTail、addSorted均委托给LinkedListImpl类
docQueueImpl