0
点赞
收藏
分享

微信扫一扫

ActiveMQ consumer按顺序处理消息

​​http://activemq.apache.org/exclusive-consumer.html​​

 

producer发送消息是有先后顺序的,这种顺序保持到了broker中。如果希望消息按顺序被消费掉,则应该把消息投送给单独一个consumer。如果队列只有一个consumer,那就很ok了,broker没有选择。但是,一旦唯一的consumer挂了,会造成服务不可用。因此出现了exclusive consumer,配置如下:

new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

 

如果有2个consumer都是这样配置的,broker只会把队列消息发送给其中一个consumer,如果这个consumer挂掉了,broker会把消息推送给另外的consumer,这样就保证了按顺序消费消息。

 

那么,ActiveMQ是怎样实现这种逻辑的呢?

org.apache.activemq.broker.region.Queue中维持了一个consumer列表,分发消息的时候,会去遍历列表,在队列中靠前的consumer会优先被分发消息。

// org.apache.activemq.broker.region.Queue
// 该方法把消息分发给consumer,PendingList是消息列表
private PendingList doActualDispatch(PendingList list) throws Exception {
//消费者列表
List<Subscription> consumers;
consumersLock.writeLock().lock();

try {
if (this.consumers.isEmpty()) { // 消费者为空,直接返回
// slave dispatch happens in processDispatchNotification
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
} finally {
consumersLock.writeLock().unlock();
}

// 初始化fullConsumers
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
// 遍历消息列表
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
Subscription target = null;
// 遍历消费者
for (Subscription s : consumers) {
if (s instanceof QueueBrowserSubscription) {
continue;
}

if (!fullConsumers.contains(s)) {
if (!s.isFull()) { //消费者not full
//满足以下条件可以分发:
//1. 符合QueueDispatchSelector的规则
//2. 消息的group属性和消费者匹配
//3. 消息没有被应答
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
// Dispatch it.
s.add(node);
LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
//从发送列表中删除,消息并不会真的删除
iterator.remove();
//设置消息的target
target = s;
break;
}
} else {
// no further dispatch of list to a full consumer to
// avoid out of order message receipt
fullConsumers.add(s);
LOG.trace("Subscription full {}", s);
}
}
}

if (target == null && node.isDropped()) {
iterator.remove();
}

// return if there are no consumers or all consumers are full
if (target == null && consumers.size() == fullConsumers.size()) {
return list;
}

// 在列表中调整consumer的顺序
// 如果是exlusive consumer,则不会进分支,那么exlusive consumer的顺序不会变
// 一旦进入这个分支,当前的consumer会被放到最后
// If it got dispatched, rotate the consumer list to get round robin
// distribution.
if (target != null && !strictOrderDispatch && consumers.size() > 1
&& !dispatchSelector.isExclusiveConsumer(target)) {
consumersLock.writeLock().lock();
try {
// 先从this.consumers中删除当前consumer
if (removeFromConsumerList(target)) {
// 然后把当前consumer添加到this.consumers中
addToConsumerList(target);
consumers = new ArrayList<Subscription>(this.consumers);
}
} finally {
consumersLock.writeLock().unlock();
}
}
}

return list;
}


private void addToConsumerList(Subscription sub) {
if (useConsumerPriority) {
consumers.add(sub);
Collections.sort(consumers, orderedCompare);
} else {
consumers.add(sub);
}
}

QueueDispatchSelector保证:如果配置了 exclusive consumer,一定会把消息分发给 exclusive consumer。

// org.apache.activemq.broker.region.QueueDispatchSelector
public boolean canSelect(Subscription subscription,
MessageReference m) throws Exception {

boolean result = super.canDispatch(subscription, m);
if (result && !subscription.isBrowser()) {
// 没有配置exclusiveConsumer,或者exclusiveConsumer就是当前消费者
result = exclusiveConsumer == null || exclusiveConsumer == subscription;
}
return result;
}

在添加消费者的时候,设置exclusive consumer:

//org.apache.activemq.broker.region.Queue
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
LOG.debug("{} add sub: {}, dequeues: {}, dispatched: {}, inflight: {}", new Object[]{ getActiveMQDestination().getQualifiedName(), getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount() });

super.addSubscription(context, sub);
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
pagedInPendingDispatchLock.writeLock().lock();
try {

sub.add(context, this);

// needs to be synchronized - so no contention with dispatching
// consumersLock.
consumersLock.writeLock().lock();
try {
// set a flag if this is a first consumer
if (consumers.size() == 0) {
firstConsumer = true;
if (consumersBeforeDispatchStarts != 0) {
consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
}
} else {
if (consumersBeforeStartsLatch != null) {
consumersBeforeStartsLatch.countDown();
}
}

addToConsumerList(sub);
//设置 exclusive consumer
if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
if (exclusiveConsumer == null) {
// exclusiveConsumer为空
exclusiveConsumer = sub;
} else if (sub.getConsumerInfo().getPriority() == Byte.MAX_VALUE ||
sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
//如果当前订阅者的优先级比已有的exclusiveConsumer高
exclusiveConsumer = sub;
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
} finally {
consumersLock.writeLock().unlock();
}

if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
browserDispatches.add(browserDispatch);
}

if (!this.optimizedDispatch) {
wakeup();
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
wakeup();
}
}

 

举报

相关推荐

0 条评论