001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import org.apache.activemq.broker.region.MessageReference;
025    import org.apache.activemq.command.MessageId;
026    
027    public class PrioritizedPendingList implements PendingList {
028        static final Integer MAX_PRIORITY = 10;
029        private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
030        final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
031    
032        public PrioritizedPendingList() {
033            for (int i = 0; i < MAX_PRIORITY; i++) {
034                this.lists[i] = new OrderedPendingList();
035            }
036        }
037        public PendingNode addMessageFirst(MessageReference message) {
038            PendingNode node = getList(message).addMessageFirst(message);
039            this.map.put(message.getMessageId(), node);
040            return node;
041        }
042    
043        public PendingNode addMessageLast(MessageReference message) {
044            PendingNode node = getList(message).addMessageLast(message);
045            this.map.put(message.getMessageId(), node);
046            return node;
047        }
048    
049        public void clear() {
050            for (int i = 0; i < MAX_PRIORITY; i++) {
051                this.lists[i].clear();
052            }
053            this.map.clear();
054        }
055    
056        public boolean isEmpty() {
057            return this.map.isEmpty();
058        }
059    
060        public Iterator<MessageReference> iterator() {
061            return new PrioritizedPendingListIterator();
062        }
063    
064        public void remove(MessageReference message) {
065            if (message != null) {
066                PendingNode node = this.map.remove(message.getMessageId());
067                if (node != null) {
068                    node.getList().removeNode(node);
069                }
070            }
071        }
072    
073        public int size() {
074            return this.map.size();
075        }
076    
077        @Override
078        public String toString() {
079            return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
080        }
081    
082        protected int getPriority(MessageReference message) {
083            int priority = javax.jms.Message.DEFAULT_PRIORITY;
084            if (message.getMessageId() != null) {
085                priority = Math.max(message.getMessage().getPriority(), 0);
086                priority = Math.min(priority, 9);
087            }
088            return priority;
089        }
090    
091        protected OrderedPendingList getList(MessageReference msg) {
092            return lists[getPriority(msg)];
093        }
094    
095        private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
096            private int index = 0;
097            private int currentIndex = 0;
098            List<PendingNode> list = new ArrayList<PendingNode>(size());
099    
100            PrioritizedPendingListIterator() {
101                for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
102                    OrderedPendingList orderedPendingList = lists[i];
103                    if (!orderedPendingList.isEmpty()) {
104                        list.addAll(orderedPendingList.getAsList());
105                    }
106                }
107            }
108            public boolean hasNext() {
109                return list.size() > index;
110            }
111    
112            public MessageReference next() {
113                PendingNode node = list.get(this.index);
114                this.currentIndex = this.index;
115                this.index++;
116                return node.getMessage();
117            }
118    
119            public void remove() {
120                PendingNode node = list.get(this.currentIndex);
121                if (node != null) {
122                    map.remove(node.getMessage().getMessageId());
123                    node.getList().removeNode(node);
124                }
125    
126            }
127    
128        }
129    
130    }