001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.cursors;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedList;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicLong;
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.broker.region.Destination;
027    import org.apache.activemq.broker.region.MessageReference;
028    import org.apache.activemq.broker.region.QueueMessageReference;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
031    import org.apache.activemq.openwire.OpenWireFormat;
032    import org.apache.activemq.store.kahadb.plist.PList;
033    import org.apache.activemq.store.kahadb.plist.PListEntry;
034    import org.apache.activemq.store.kahadb.plist.PListStore;
035    import org.apache.activemq.usage.SystemUsage;
036    import org.apache.activemq.usage.Usage;
037    import org.apache.activemq.usage.UsageListener;
038    import org.apache.activemq.wireformat.WireFormat;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    import org.apache.kahadb.util.ByteSequence;
042    
043    /**
044     * persist pending messages pending message (messages awaiting dispatch to a
045     * consumer) cursor
046     * 
047     * 
048     */
049    public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
050        static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
051        private static final AtomicLong NAME_COUNT = new AtomicLong();
052        protected Broker broker;
053        private final PListStore store;
054        private final String name;
055        private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
056        private PList diskList;
057        private Iterator<MessageReference> iter;
058        private Destination regionDestination;
059        private boolean iterating;
060        private boolean flushRequired;
061        private final AtomicBoolean started = new AtomicBoolean();
062        private final WireFormat wireFormat = new OpenWireFormat();
063        /**
064         * @param broker
065         * @param name
066         * @param prioritizedMessages
067         */
068        public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
069            super(prioritizedMessages);
070            this.broker = broker;
071            // the store can be null if the BrokerService has persistence
072            // turned off
073            this.store = broker.getTempDataStore();
074            this.name = NAME_COUNT.incrementAndGet() + "_" + name;
075        }
076    
077        @Override
078        public void start() throws Exception {
079            if (started.compareAndSet(false, true)) {
080                super.start();
081                if (systemUsage != null) {
082                    systemUsage.getMemoryUsage().addUsageListener(this);
083                }
084            }
085        }
086    
087        @Override
088        public void stop() throws Exception {
089            if (started.compareAndSet(true, false)) {
090                super.stop();
091                if (systemUsage != null) {
092                    systemUsage.getMemoryUsage().removeUsageListener(this);
093                }
094            }
095        }
096    
097        /**
098         * @return true if there are no pending messages
099         */
100        @Override
101        public synchronized boolean isEmpty() {
102            if (memoryList.isEmpty() && isDiskListEmpty()) {
103                return true;
104            }
105            for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
106                MessageReference node = iterator.next();
107                if (node == QueueMessageReference.NULL_MESSAGE) {
108                    continue;
109                }
110                if (!node.isDropped()) {
111                    return false;
112                }
113                // We can remove dropped references.
114                iterator.remove();
115            }
116            return isDiskListEmpty();
117        }
118    
119        /**
120         * reset the cursor
121         */
122        @Override
123        public synchronized void reset() {
124            iterating = true;
125            last = null;
126            if (isDiskListEmpty()) {
127                this.iter = this.memoryList.iterator();
128            } else {
129                this.iter = new DiskIterator();
130            }
131        }
132    
133        @Override
134        public synchronized void release() {
135            iterating = false;
136            if (flushRequired) {
137                flushRequired = false;
138                flushToDisk();
139            }
140        }
141    
142        @Override
143        public synchronized void destroy() throws Exception {
144            stop();
145            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
146                Message node = (Message) i.next();
147                node.decrementReferenceCount();
148            }
149            memoryList.clear();
150            destroyDiskList();
151        }
152    
153        private void destroyDiskList() throws Exception {
154            if (!isDiskListEmpty()) {
155                store.removePList(name);
156            }
157        }
158    
159        @Override
160        public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
161            LinkedList<MessageReference> result = new LinkedList<MessageReference>();
162            int count = 0;
163            for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
164                MessageReference ref = i.next();
165                ref.incrementReferenceCount();
166                result.add(ref);
167                count++;
168            }
169            if (count < maxItems && !isDiskListEmpty()) {
170                for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
171                    Message message = (Message) i.next();
172                    message.setRegionDestination(regionDestination);
173                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
174                    message.incrementReferenceCount();
175                    result.add(message);
176                    count++;
177                }
178            }
179            return result;
180        }
181    
182        /**
183         * add message to await dispatch
184         * 
185         * @param node
186         * @throws Exception 
187         */
188        @Override
189        public synchronized void addMessageLast(MessageReference node) throws Exception {
190            tryAddMessageLast(node, 0);
191        }
192        
193        @Override
194        public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
195            if (!node.isExpired()) {
196                try {
197                    regionDestination = node.getMessage().getRegionDestination();
198                    if (isDiskListEmpty()) {
199                        if (hasSpace() || this.store == null) {
200                            memoryList.add(node);
201                            node.incrementReferenceCount();
202                            setCacheEnabled(true);
203                            return true;
204                        }
205                    }
206                    if (!hasSpace()) {
207                        if (isDiskListEmpty()) {
208                            expireOldMessages();
209                            if (hasSpace()) {
210                                memoryList.add(node);
211                                node.incrementReferenceCount();
212                                return true;
213                            } else {
214                                flushToDisk();
215                            }
216                        }
217                    }
218                    if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
219                        ByteSequence bs = getByteSequence(node.getMessage());
220                        getDiskList().addLast(node.getMessageId().toString(), bs);
221                        return true;
222                    }
223                    return false;
224    
225                } catch (Exception e) {
226                    LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
227                    throw new RuntimeException(e);
228                }
229            } else {
230                discard(node);
231            }
232            //message expired
233            return true;
234        }
235    
236        /**
237         * add message to await dispatch
238         * 
239         * @param node
240         */
241        @Override
242        public synchronized void addMessageFirst(MessageReference node) {
243            if (!node.isExpired()) {
244                try {
245                    regionDestination = node.getMessage().getRegionDestination();
246                    if (isDiskListEmpty()) {
247                        if (hasSpace()) {
248                            memoryList.addFirst(node);
249                            node.incrementReferenceCount();
250                            setCacheEnabled(true);
251                            return;
252                        }
253                    }
254                    if (!hasSpace()) {
255                        if (isDiskListEmpty()) {
256                            expireOldMessages();
257                            if (hasSpace()) {
258                                memoryList.addFirst(node);
259                                node.incrementReferenceCount();
260                                return;
261                            } else {
262                                flushToDisk();
263                            }
264                        }
265                    }
266                    systemUsage.getTempUsage().waitForSpace();
267                    node.decrementReferenceCount();
268                    ByteSequence bs = getByteSequence(node.getMessage());
269                    getDiskList().addFirst(node.getMessageId().toString(), bs);
270    
271                } catch (Exception e) {
272                    LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
273                    throw new RuntimeException(e);
274                }
275            } else {
276                discard(node);
277            }
278        }
279    
280        /**
281         * @return true if there pending messages to dispatch
282         */
283        @Override
284        public synchronized boolean hasNext() {
285            return iter.hasNext();
286        }
287    
288        /**
289         * @return the next pending message
290         */
291        @Override
292        public synchronized MessageReference next() {
293            Message message = (Message) iter.next();
294            last = message;
295            if (!isDiskListEmpty()) {
296                // got from disk
297                message.setRegionDestination(regionDestination);
298                message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
299            }
300            message.incrementReferenceCount();
301            return message;
302        }
303    
304        /**
305         * remove the message at the cursor position
306         */
307        @Override
308        public synchronized void remove() {
309            iter.remove();
310            if (last != null) {
311                last.decrementReferenceCount();
312            }
313        }
314    
315        /**
316         * @param node
317         * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
318         */
319        @Override
320        public synchronized void remove(MessageReference node) {
321            if (memoryList.remove(node)) {
322                node.decrementReferenceCount();
323            }
324            if (!isDiskListEmpty()) {
325                try {
326                    getDiskList().remove(node.getMessageId().toString());
327                } catch (IOException e) {
328                    throw new RuntimeException(e);
329                }
330            }
331        }
332    
333        /**
334         * @return the number of pending messages
335         */
336        @Override
337        public synchronized int size() {
338            return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
339        }
340    
341        /**
342         * clear all pending messages
343         */
344        @Override
345        public synchronized void clear() {
346            memoryList.clear();
347            if (!isDiskListEmpty()) {
348                try {
349                    getDiskList().destroy();
350                } catch (IOException e) {
351                    throw new RuntimeException(e);
352                }
353            }
354            last = null;
355        }
356    
357        @Override
358        public synchronized boolean isFull() {
359    
360            return super.isFull() || (systemUsage != null && systemUsage.getTempUsage().isFull());
361    
362        }
363    
364        @Override
365        public boolean hasMessagesBufferedToDeliver() {
366            return !isEmpty();
367        }
368    
369        @Override
370        public void setSystemUsage(SystemUsage usageManager) {
371            super.setSystemUsage(usageManager);
372        }
373    
374        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
375            if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
376                synchronized (this) {
377                    flushRequired = true;
378                    if (!iterating) {
379                        expireOldMessages();
380                        if (!hasSpace()) {
381                            flushToDisk();
382                            flushRequired = false;
383                        }
384                    }
385                }
386            }
387        }
388    
389        @Override
390        public boolean isTransient() {
391            return true;
392        }
393    
394        protected boolean isSpaceInMemoryList() {
395            return hasSpace() && isDiskListEmpty();
396        }
397    
398        protected synchronized void expireOldMessages() {
399            if (!memoryList.isEmpty()) {
400                LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
401                this.memoryList = new LinkedList<MessageReference>();
402                while (!tmpList.isEmpty()) {
403                    MessageReference node = tmpList.removeFirst();
404                    if (node.isExpired()) {
405                        discard(node);
406                    } else {
407                        memoryList.add(node);
408                    }
409                }
410            }
411    
412        }
413    
414        protected synchronized void flushToDisk() {
415    
416            if (!memoryList.isEmpty()) {
417                while (!memoryList.isEmpty()) {
418                    MessageReference node = memoryList.removeFirst();
419                    node.decrementReferenceCount();
420                    ByteSequence bs;
421                    try {
422                        bs = getByteSequence(node.getMessage());
423                        getDiskList().addLast(node.getMessageId().toString(), bs);
424                    } catch (IOException e) {
425                        LOG.error("Failed to write to disk list", e);
426                        throw new RuntimeException(e);
427                    }
428    
429                }
430                memoryList.clear();
431                setCacheEnabled(false);
432            }
433        }
434    
435        protected boolean isDiskListEmpty() {
436            return diskList == null || diskList.isEmpty();
437        }
438    
439        protected PList getDiskList() {
440            if (diskList == null) {
441                try {
442                    diskList = store.getPList(name);
443                } catch (Exception e) {
444                    LOG.error("Caught an IO Exception getting the DiskList " + name, e);
445                    throw new RuntimeException(e);
446                }
447            }
448            return diskList;
449        }
450    
451        protected void discard(MessageReference message) {
452            message.decrementReferenceCount();
453            if (LOG.isDebugEnabled()) {
454                LOG.debug("Discarding message " + message);
455            }
456            ConnectionContext ctx = new ConnectionContext(new NonCachedMessageEvaluationContext());
457            ctx.setBroker(broker);
458            broker.getRoot().sendToDeadLetterQueue(ctx, message, null);
459        }
460    
461        protected ByteSequence getByteSequence(Message message) throws IOException {
462            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
463            return new ByteSequence(packet.data, packet.offset, packet.length);
464        }
465    
466        protected Message getMessage(ByteSequence bs) throws IOException {
467            org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
468                    .getOffset(), bs.getLength());
469            return (Message) this.wireFormat.unmarshal(packet);
470    
471        }
472    
473        final class DiskIterator implements Iterator<MessageReference> {
474            private PListEntry next = null;
475            private PListEntry current = null;
476            PList list;
477    
478            DiskIterator() {
479                try {
480                    this.list = getDiskList();
481                    synchronized (this.list) {
482                        this.current = this.list.getFirst();
483                        this.next = this.current;
484                    }
485                } catch (Exception e) {
486                    throw new RuntimeException(e);
487                }
488            }
489    
490            public boolean hasNext() {
491                return this.next != null;
492            }
493    
494            public MessageReference next() {
495                this.current = next;
496                try {
497                    ByteSequence bs = this.current.getByteSequence();
498                    synchronized (this.list) {
499                        this.current = this.list.refresh(this.current);
500                        this.next = this.list.getNext(this.current);
501                    }
502                    return getMessage(bs);
503                } catch (IOException e) {
504                    LOG.error("I/O error", e);
505                    throw new RuntimeException(e);
506                }
507            }
508    
509            public void remove() {
510                try {
511                    synchronized (this.list) {
512                        this.current = this.list.refresh(this.current);
513                        this.list.remove(this.current);
514                    }
515    
516                } catch (IOException e) {
517                    LOG.error("I/O error", e);
518                    throw new RuntimeException(e);
519                }
520    
521            }
522    
523        }
524    }