来自 科技 1970-01-01 08:00 的文章

本文主要研究一下artemis message的priority



priority

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

public class CoreMessage extends RefCountMessage implements ICoreMessage {​  //......  protected byte priority;​  public byte getPriority() {    return priority;  }​  public CoreMessage setPriority(byte priority) {    this.priority = priority;    messageChanged();    return this;  }​  //......}
CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法messageReferences.add

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {​  //......​  private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());​  //......  ​  private synchronized void internalAddTail(final MessageReference ref) {    refAdded(ref);    messageReferences.addTail(ref, getPriority(ref));    pendingMetrics.incrementMetrics(ref);    enforceRing(false);  }​  private void internalAddHead(final MessageReference ref) {    queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());    pendingMetrics.incrementMetrics(ref);    refAdded(ref);​    int priority = getPriority(ref);​    messageReferences.addHead(ref, priority);​    ref.setInDelivery(false);  }​  private void internalAddSorted(final MessageReference ref) {    queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());    pendingMetrics.incrementMetrics(ref);    refAdded(ref);​    int priority = getPriority(ref);​    messageReferences.addSorted(ref, priority);  }​  private int getPriority(MessageReference ref) {    try {     return ref.getMessage().getPriority();    } catch (Throwable e) {     ActiveMQServerLogger.LOGGER.unableToGetMessagePriority(e);     return 4; // the default one in case of failure    }  }​  //......}
QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列PriorityLinkedList

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java

public interface PriorityLinkedList<T> {​  void addHead(T t, int priority);​  void addTail(T t, int priority);​  void addSorted(T t, int priority);​  T poll();​  void clear();​  /**   * Returns the size of this list.<br>   * It is safe to be called concurrently.   */  int size();​  LinkedListIterator<T> iterator();​  /**   * Returns {@code true} if empty, {@code false} otherwise.<br>   * It is safe to be called concurrently.   */  boolean isEmpty();}
PriorityLinkedList接口定义了根据priority的addHead、addTail、addSorted方法,其size以及isEmpty要求是线程安全的PriorityLinkedListImpl

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java

public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {​  private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");​  protected LinkedListImpl<T>[] levels;​  private volatile int size;​  private int lastReset;​  private int highestPriority = -1;​  private int lastPriority = -1;​  public PriorityLinkedListImpl(final int priorities) {    this(priorities, null);  }​​  public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) {    levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);​    for (int i = 0; i < priorities; i++) {     levels[i] = new LinkedListImpl<>(comparator);    }  }​  private void checkHighest(final int priority) {    if (lastPriority != priority || priority > highestPriority) {     lastPriority = priority;     if (lastReset == Integer.MAX_VALUE) {       lastReset = 0;     } else {       lastReset++;     }    }​    if (priority > highestPriority) {     highestPriority = priority;    }  }​  @Override  public void addHead(final T t, final int priority) {    checkHighest(priority);​    levels[priority].addHead(t);​    exclusiveIncrementSize(1);  }​  @Override  public void addTail(final T t, final int priority) {    checkHighest(priority);​    levels[priority].addTail(t);​    exclusiveIncrementSize(1);  }​  @Override  public void addSorted(T t, int priority) {    checkHighest(priority);​    levels[priority].addSorted(t);​    exclusiveIncrementSize(1);  }​  @Override  public T poll() {    T t = null;​    // We are just using a simple prioritization algorithm:    // Highest priority refs always get returned first.    // This could cause starvation of lower priority refs.​    // TODO - A better prioritization algorithm​    for (int i = highestPriority; i >= 0; i--) {     LinkedListImpl<T> ll = levels[i];​     if (ll.size() != 0) {       t = ll.poll();​       if (t != null) {        exclusiveIncrementSize(-1);​        if (ll.size() == 0) {          if (highestPriority == i) {           highestPriority--;          }        }       }​       break;     }    }​    return t;  }​  @Override  public void clear() {    for (LinkedListImpl<T> list : levels) {     list.clear();    }​    exclusiveSetSize(0);  }​  private void exclusiveIncrementSize(int amount) {    SIZE_UPDATER.lazySet(this, this.size + amount);  }​  private void exclusiveSetSize(int value) {    SIZE_UPDATER.lazySet(this, value);  }​  @Override  public int size() {    return size;  }​  @Override  public boolean isEmpty() {    return size == 0;  }​  @Override  public LinkedListIterator<T> iterator() {    return new PriorityLinkedListIterator();  }​  //......}
PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;其addHead、addTail、addSorted先执行checkHighest(priority)维护highestPriority,之后调用对应priority的LinkedListImpl的addHead、addTail、addSorted方法,最后调用exclusiveIncrementSize方法递增size;其poll方法会从highestPriority的LinkedListImpl开始poll小结

CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法;QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列;PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;PriorityLinkedListImpl的addHead、addTail、addSorted均委托给LinkedListImpl类

docQueueImpl

热门文章