0
点赞
收藏
分享

微信扫一扫

ActiveMQ queue 分页

分页:即获取部分数据,queue按页从message cursor读取消息,然后分发给consumer。

页大小:

public abstract class BaseDestination implements Destination {
/**
* The maximum number of messages to page in to the destination from
* persistent storage
*/
public static final int MAX_PAGE_SIZE = 200;
}

存放分页消息的数据结构:

public class Queue extends BaseDestination implements Task, UsageListener {
// message cursor,可视为消息的数据源
protected PendingMessageCursor messages;
// 所有的分页消息
private final PendingList pagedInMessages = new OrderedPendingList();
// 剩余的没有dispatch的分页消息
protected PendingList pagedInPendingDispatch = new OrderedPendingList();
}

把消息添加到分页中:

protected void pageInMessages(boolean force) throws Exception {
doDispatch(doPageInForDispatch(force, true));
}

 

1 private PendingList doPageInForDispatch(boolean force, boolean processExpired) throws Exception {
2 List<QueueMessageReference> result = null;
3 PendingList resultList = null;
4
5 // 根据maxPageSize和message cursor中的大小,决定需要读取的消息数量
6 int toPageIn = Math.min(getMaxPageSize(), messages.size());
7 int pagedInPendingSize = 0;
8 pagedInPendingDispatchLock.readLock().lock();
9 try {
10 pagedInPendingSize = pagedInPendingDispatch.size();
11 } finally {
12 pagedInPendingDispatchLock.readLock().unlock();
13 }
14
15 LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}",
16 new Object[]{
17 destination.getPhysicalName(),
18 toPageIn,
19 destinationStatistics.getInflight().getCount(),
20 pagedInMessages.size(),
21 pagedInPendingSize,
22 destinationStatistics.getEnqueues().getCount(),
23 destinationStatistics.getDequeues().getCount(),
24 getMemoryUsage().getUsage()
25 });
26 if (isLazyDispatch() && !force) {
27 // Only page in the minimum number of messages which can be
28 // dispatched immediately.
29 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
30 }
31 if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
32 int count = 0;
33 result = new ArrayList<QueueMessageReference>(toPageIn);
34 messagesLock.writeLock().lock();
35 try {
36 try {
37 messages.setMaxBatchSize(toPageIn);
38 messages.reset();
39 while (messages.hasNext() && count < toPageIn) {
40 MessageReference node = messages.next();
41 messages.remove();
42
43 QueueMessageReference ref = createMessageReference(node.getMessage());
44 if (processExpired && ref.isExpired()) {
45 if (broker.isExpired(ref)) {
46 messageExpired(createConnectionContext(), ref);
47 } else {
48 ref.decrementReferenceCount();
49 }
50 } else {
51 // 添加QueueMessageReference到result中
52 result.add(ref);
53 count++;
54 }
55 }
56 } finally {
57 messages.release();
58 }
59 } finally {
60 messagesLock.writeLock().unlock();
61 }
62 // Only add new messages, not already pagedIn to avoid multiple
63 // dispatch attempts
64 pagedInMessagesLock.writeLock().lock();
65 try {
66 if(isPrioritizedMessages()) {
67 resultList = new PrioritizedPendingList();
68 } else {
69 resultList = new OrderedPendingList();
70 }
71 for (QueueMessageReference ref : result) {
72 if (!pagedInMessages.contains(ref)) {
73 //分别添加QueueMessageReference到 pagedInMessages 和 resultList
74 //resultList作为返回值,直接传递给doDispatch(PendingList list),
75 //在doDispatch中,分发给消费者后,就会从 resultList 中删除,
76 pagedInMessages.addMessageLast(ref);
77 resultList.addMessageLast(ref);
78 } else {
79 ref.decrementReferenceCount();
80 // store should have trapped duplicate in it's index, also cursor audit
81 // we need to remove the duplicate from the store in the knowledge that the original message may be inflight
82 // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
83 LOG.warn("{}, duplicate message {} paged in, is cursor audit disabled? Removing from store and redirecting to dlq", this, ref.getMessage());
84 if (store != null) {
85 ConnectionContext connectionContext = createConnectionContext();
86 store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
87 broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from store for " + destination));
88 }
89 }
90 }
91 } finally {
92 pagedInMessagesLock.writeLock().unlock();
93 }
94 } else {
95 // Avoid return null list, if condition is not validated
96 resultList = new OrderedPendingList();
97 }
98
99 return resultList;
100 }
101
102 //分发消息
103 private void doDispatch(PendingList list) throws Exception {
104 boolean doWakeUp = false;
105
106 pagedInPendingDispatchLock.writeLock().lock();
107 try {
108 //存在需要重新发送的消息
109 if (!redeliveredWaitingDispatch.isEmpty()) {
110 // Try first to dispatch redelivered messages to keep an
111 // proper order
112 redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
113 }
114 //存在没有分发的分页消息
115 if (!pagedInPendingDispatch.isEmpty()) {
116 // Next dispatch anything that had not been
117 // dispatched before.
118 pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
119 }
120 // and now see if we can dispatch the new stuff.. and append to
121 // the pending
122 // list anything that does not actually get dispatched.
123 if (list != null && !list.isEmpty()) {
124 if (pagedInPendingDispatch.isEmpty()) {
125 //doActualDispatch进行实际的分发消息:
126 //分发给消费者的消息,会从list中删除,list中保存剩下的消息,doActualDispatch返回list
127 pagedInPendingDispatch.addAll(doActualDispatch(list));
128 } else {
129 for (MessageReference qmr : list) {
130 if (!pagedInPendingDispatch.contains(qmr)) {
131 pagedInPendingDispatch.addMessageLast(qmr);
132 }
133 }
134 doWakeUp = true;
135 }
136 }
137 } finally {
138 pagedInPendingDispatchLock.writeLock().unlock();
139 }
140
141 if (doWakeUp) {
142 // avoid lock order contention
143 asyncWakeup();
144 }
145 }
146
147 // 实际分发消息
148 private PendingList doActualDispatch(PendingList list) throws Exception {
149 List<Subscription> consumers;
150 consumersLock.writeLock().lock();
151
152 try {
153 if (this.consumers.isEmpty()) {
154 // slave dispatch happens in processDispatchNotification
155 return list;
156 }
157 consumers = new ArrayList<Subscription>(this.consumers);
158 } finally {
159 consumersLock.writeLock().unlock();
160 }
161
162 Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
163
164 for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
165
166 MessageReference node = iterator.next();
167 Subscription target = null;
168 for (Subscription s : consumers) {
169 if (s instanceof QueueBrowserSubscription) {
170 continue;
171 }
172 if (!fullConsumers.contains(s)) {
173 if (!s.isFull()) {
174 if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
175 // Dispatch it.
176 s.add(node);
177 LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
178 //从list中删除
179 iterator.remove();
180 target = s;
181 break;
182 }
183 } else {
184 // no further dispatch of list to a full consumer to
185 // avoid out of order message receipt
186 fullConsumers.add(s);
187 LOG.trace("Subscription full {}", s);
188 }
189 }
190 }
191
192 if (target == null && node.isDropped()) {
193 iterator.remove();
194 }
195
196 // return if there are no consumers or all consumers are full
197 if (target == null && consumers.size() == fullConsumers.size()) {
198 return list;
199 }
200
201 // If it got dispatched, rotate the consumer list to get round robin
202 // distribution.
203 if (target != null && !strictOrderDispatch && consumers.size() > 1
204 && !dispatchSelector.isExclusiveConsumer(target)) {
205 consumersLock.writeLock().lock();
206 try {
207 if (removeFromConsumerList(target)) {
208 addToConsumerList(target);
209 consumers = new ArrayList<Subscription>(this.consumers);
210 }
211 } finally {
212 consumersLock.writeLock().unlock();
213 }
214 }
215 }
216
217 return list;
218 }

 

举报

相关推荐

0 条评论