001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Set;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.FutureTask;
029    import java.util.concurrent.LinkedBlockingQueue;
030    import java.util.concurrent.ThreadFactory;
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import org.apache.activeio.journal.InvalidRecordLocationException;
035    import org.apache.activeio.journal.Journal;
036    import org.apache.activeio.journal.JournalEventListener;
037    import org.apache.activeio.journal.RecordLocation;
038    import org.apache.activeio.packet.ByteArrayPacket;
039    import org.apache.activeio.packet.Packet;
040    import org.apache.activemq.broker.BrokerService;
041    import org.apache.activemq.broker.BrokerServiceAware;
042    import org.apache.activemq.broker.ConnectionContext;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.ActiveMQQueue;
045    import org.apache.activemq.command.ActiveMQTopic;
046    import org.apache.activemq.command.DataStructure;
047    import org.apache.activemq.command.JournalQueueAck;
048    import org.apache.activemq.command.JournalTopicAck;
049    import org.apache.activemq.command.JournalTrace;
050    import org.apache.activemq.command.JournalTransaction;
051    import org.apache.activemq.command.Message;
052    import org.apache.activemq.command.MessageAck;
053    import org.apache.activemq.command.ProducerId;
054    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055    import org.apache.activemq.openwire.OpenWireFormat;
056    import org.apache.activemq.store.MessageStore;
057    import org.apache.activemq.store.PersistenceAdapter;
058    import org.apache.activemq.store.TopicMessageStore;
059    import org.apache.activemq.store.TransactionStore;
060    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061    import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062    import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063    import org.apache.activemq.thread.Scheduler;
064    import org.apache.activemq.thread.Task;
065    import org.apache.activemq.thread.TaskRunner;
066    import org.apache.activemq.thread.TaskRunnerFactory;
067    import org.apache.activemq.usage.SystemUsage;
068    import org.apache.activemq.usage.Usage;
069    import org.apache.activemq.usage.UsageListener;
070    import org.apache.activemq.util.ByteSequence;
071    import org.apache.activemq.util.IOExceptionSupport;
072    import org.apache.activemq.wireformat.WireFormat;
073    import org.slf4j.Logger;
074    import org.slf4j.LoggerFactory;
075    
076    /**
077     * An implementation of {@link PersistenceAdapter} designed for use with a
078     * {@link Journal} and then check pointing asynchronously on a timeout with some
079     * other long term persistent storage.
080     * 
081     * @org.apache.xbean.XBean
082     * 
083     */
084    public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
085    
086        private BrokerService brokerService;
087            
088        protected Scheduler scheduler;
089        private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
090    
091        private Journal journal;
092        private PersistenceAdapter longTermPersistence;
093    
094        private final WireFormat wireFormat = new OpenWireFormat();
095    
096        private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
097        private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
098    
099        private SystemUsage usageManager;
100        private final long checkpointInterval = 1000 * 60 * 5;
101        private long lastCheckpointRequest = System.currentTimeMillis();
102        private long lastCleanup = System.currentTimeMillis();
103        private int maxCheckpointWorkers = 10;
104        private int maxCheckpointMessageAddSize = 1024 * 1024;
105    
106        private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
107        private ThreadPoolExecutor checkpointExecutor;
108    
109        private TaskRunner checkpointTask;
110        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
111        private boolean fullCheckPoint;
112    
113        private final AtomicBoolean started = new AtomicBoolean(false);
114    
115        private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
116    
117        private TaskRunnerFactory taskRunnerFactory;
118    
119        public JournalPersistenceAdapter() {        
120        }
121        
122        public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
123            setJournal(journal);
124            setTaskRunnerFactory(taskRunnerFactory);
125            setPersistenceAdapter(longTermPersistence);
126        }
127    
128        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
129            this.taskRunnerFactory = taskRunnerFactory;
130        }
131    
132        public void setJournal(Journal journal) {
133            this.journal = journal;
134            journal.setJournalEventListener(this);
135        }
136        
137        public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
138            this.longTermPersistence = longTermPersistence;
139        }
140        
141        final Runnable createPeriodicCheckpointTask() {
142            return new Runnable() {
143                public void run() {
144                    long lastTime = 0;
145                    synchronized (this) {
146                        lastTime = lastCheckpointRequest;
147                    }
148                    if (System.currentTimeMillis() > lastTime + checkpointInterval) {
149                        checkpoint(false, true);
150                    }
151                }
152            };
153        }
154    
155        /**
156         * @param usageManager The UsageManager that is controlling the
157         *                destination's memory usage.
158         */
159        public void setUsageManager(SystemUsage usageManager) {
160            this.usageManager = usageManager;
161            longTermPersistence.setUsageManager(usageManager);
162        }
163    
164        public Set<ActiveMQDestination> getDestinations() {
165            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
166            destinations.addAll(queues.keySet());
167            destinations.addAll(topics.keySet());
168            return destinations;
169        }
170    
171        private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
172            if (destination.isQueue()) {
173                return createQueueMessageStore((ActiveMQQueue)destination);
174            } else {
175                return createTopicMessageStore((ActiveMQTopic)destination);
176            }
177        }
178    
179        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
180            JournalMessageStore store = queues.get(destination);
181            if (store == null) {
182                MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
183                store = new JournalMessageStore(this, checkpointStore, destination);
184                queues.put(destination, store);
185            }
186            return store;
187        }
188    
189        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
190            JournalTopicMessageStore store = topics.get(destinationName);
191            if (store == null) {
192                TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
193                store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
194                topics.put(destinationName, store);
195            }
196            return store;
197        }
198    
199        /**
200         * Cleanup method to remove any state associated with the given destination
201         *
202         * @param destination Destination to forget
203         */
204        public void removeQueueMessageStore(ActiveMQQueue destination) {
205            queues.remove(destination);
206        }
207    
208        /**
209         * Cleanup method to remove any state associated with the given destination
210         *
211         * @param destination Destination to forget
212         */
213        public void removeTopicMessageStore(ActiveMQTopic destination) {
214            topics.remove(destination);
215        }
216    
217        public TransactionStore createTransactionStore() throws IOException {
218            return transactionStore;
219        }
220    
221        public long getLastMessageBrokerSequenceId() throws IOException {
222            return longTermPersistence.getLastMessageBrokerSequenceId();
223        }
224    
225        public void beginTransaction(ConnectionContext context) throws IOException {
226            longTermPersistence.beginTransaction(context);
227        }
228    
229        public void commitTransaction(ConnectionContext context) throws IOException {
230            longTermPersistence.commitTransaction(context);
231        }
232    
233        public void rollbackTransaction(ConnectionContext context) throws IOException {
234            longTermPersistence.rollbackTransaction(context);
235        }
236    
237        public synchronized void start() throws Exception {
238            if (!started.compareAndSet(false, true)) {
239                return;
240            }
241    
242            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243                public boolean iterate() {
244                    return doCheckpoint();
245                }
246            }, "ActiveMQ Journal Checkpoint Worker");
247    
248            checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
249                public Thread newThread(Runnable runable) {
250                    Thread t = new Thread(runable, "Journal checkpoint worker");
251                    t.setPriority(7);
252                    return t;
253                }
254            });
255            // checkpointExecutor.allowCoreThreadTimeOut(true);
256    
257            this.usageManager.getMemoryUsage().addUsageListener(this);
258    
259            if (longTermPersistence instanceof JDBCPersistenceAdapter) {
260                // Disabled periodic clean up as it deadlocks with the checkpoint
261                // operations.
262                ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
263            }
264    
265            longTermPersistence.start();
266            createTransactionStore();
267            recover();
268    
269            // Do a checkpoint periodically.
270            this.scheduler = new Scheduler("Journal Scheduler");
271            this.scheduler.start();
272            this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
273    
274        }
275    
276        public void stop() throws Exception {
277    
278            this.usageManager.getMemoryUsage().removeUsageListener(this);
279            if (!started.compareAndSet(true, false)) {
280                return;
281            }
282    
283            this.scheduler.cancel(periodicCheckpointTask);
284            this.scheduler.stop();
285    
286            // Take one final checkpoint and stop checkpoint processing.
287            checkpoint(true, true);
288            checkpointTask.shutdown();
289            checkpointExecutor.shutdown();
290    
291            queues.clear();
292            topics.clear();
293    
294            IOException firstException = null;
295            try {
296                journal.close();
297            } catch (Exception e) {
298                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
299            }
300            longTermPersistence.stop();
301    
302            if (firstException != null) {
303                throw firstException;
304            }
305        }
306    
307        // Properties
308        // -------------------------------------------------------------------------
309        public PersistenceAdapter getLongTermPersistence() {
310            return longTermPersistence;
311        }
312    
313        /**
314         * @return Returns the wireFormat.
315         */
316        public WireFormat getWireFormat() {
317            return wireFormat;
318        }
319    
320        // Implementation methods
321        // -------------------------------------------------------------------------
322    
323        /**
324         * The Journal give us a call back so that we can move old data out of the
325         * journal. Taking a checkpoint does this for us.
326         * 
327         * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
328         */
329        public void overflowNotification(RecordLocation safeLocation) {
330            checkpoint(false, true);
331        }
332    
333        /**
334         * When we checkpoint we move all the journalled data to long term storage.
335         * 
336         */
337        public void checkpoint(boolean sync, boolean fullCheckpoint) {
338            try {
339                if (journal == null) {
340                    throw new IllegalStateException("Journal is closed.");
341                }
342    
343                long now = System.currentTimeMillis();
344                CountDownLatch latch = null;
345                synchronized (this) {
346                    latch = nextCheckpointCountDownLatch;
347                    lastCheckpointRequest = now;
348                    if (fullCheckpoint) {
349                        this.fullCheckPoint = true;
350                    }
351                }
352    
353                checkpointTask.wakeup();
354    
355                if (sync) {
356                    LOG.debug("Waking for checkpoint to complete.");
357                    latch.await();
358                }
359            } catch (InterruptedException e) {
360                Thread.currentThread().interrupt();
361                LOG.warn("Request to start checkpoint failed: " + e, e);
362            }
363        }
364    
365        public void checkpoint(boolean sync) {
366            checkpoint(sync, sync);
367        }
368    
369        /**
370         * This does the actual checkpoint.
371         * 
372         * @return
373         */
374        public boolean doCheckpoint() {
375            CountDownLatch latch = null;
376            boolean fullCheckpoint;
377            synchronized (this) {
378                latch = nextCheckpointCountDownLatch;
379                nextCheckpointCountDownLatch = new CountDownLatch(1);
380                fullCheckpoint = this.fullCheckPoint;
381                this.fullCheckPoint = false;
382            }
383            try {
384    
385                LOG.debug("Checkpoint started.");
386                RecordLocation newMark = null;
387    
388                ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
389    
390                //
391                // We do many partial checkpoints (fullCheckpoint==false) to move
392                // topic messages
393                // to long term store as soon as possible.
394                // 
395                // We want to avoid doing that for queue messages since removes the
396                // come in the same
397                // checkpoint cycle will nullify the previous message add.
398                // Therefore, we only
399                // checkpoint queues on the fullCheckpoint cycles.
400                //
401                if (fullCheckpoint) {
402                    Iterator<JournalMessageStore> iterator = queues.values().iterator();
403                    while (iterator.hasNext()) {
404                        try {
405                            final JournalMessageStore ms = iterator.next();
406                            FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
407                                public RecordLocation call() throws Exception {
408                                    return ms.checkpoint();
409                                }
410                            });
411                            futureTasks.add(task);
412                            checkpointExecutor.execute(task);
413                        } catch (Exception e) {
414                            LOG.error("Failed to checkpoint a message store: " + e, e);
415                        }
416                    }
417                }
418    
419                Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
420                while (iterator.hasNext()) {
421                    try {
422                        final JournalTopicMessageStore ms = iterator.next();
423                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
424                            public RecordLocation call() throws Exception {
425                                return ms.checkpoint();
426                            }
427                        });
428                        futureTasks.add(task);
429                        checkpointExecutor.execute(task);
430                    } catch (Exception e) {
431                        LOG.error("Failed to checkpoint a message store: " + e, e);
432                    }
433                }
434    
435                try {
436                    for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
437                        FutureTask<RecordLocation> ft = iter.next();
438                        RecordLocation mark = ft.get();
439                        // We only set a newMark on full checkpoints.
440                        if (fullCheckpoint) {
441                            if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
442                                newMark = mark;
443                            }
444                        }
445                    }
446                } catch (Throwable e) {
447                    LOG.error("Failed to checkpoint a message store: " + e, e);
448                }
449    
450                if (fullCheckpoint) {
451                    try {
452                        if (newMark != null) {
453                            LOG.debug("Marking journal at: " + newMark);
454                            journal.setMark(newMark, true);
455                        }
456                    } catch (Exception e) {
457                        LOG.error("Failed to mark the Journal: " + e, e);
458                    }
459    
460                    if (longTermPersistence instanceof JDBCPersistenceAdapter) {
461                        // We may be check pointing more often than the
462                        // checkpointInterval if under high use
463                        // But we don't want to clean up the db that often.
464                        long now = System.currentTimeMillis();
465                        if (now > lastCleanup + checkpointInterval) {
466                            lastCleanup = now;
467                            ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
468                        }
469                    }
470                }
471    
472                LOG.debug("Checkpoint done.");
473            } finally {
474                latch.countDown();
475            }
476            synchronized (this) {
477                return this.fullCheckPoint;
478            }
479    
480        }
481    
482        /**
483         * @param location
484         * @return
485         * @throws IOException
486         */
487        public DataStructure readCommand(RecordLocation location) throws IOException {
488            try {
489                Packet packet = journal.read(location);
490                return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
491            } catch (InvalidRecordLocationException e) {
492                throw createReadException(location, e);
493            } catch (IOException e) {
494                throw createReadException(location, e);
495            }
496        }
497    
498        /**
499         * Move all the messages that were in the journal into long term storage. We
500         * just replay and do a checkpoint.
501         * 
502         * @throws IOException
503         * @throws IOException
504         * @throws InvalidRecordLocationException
505         * @throws IllegalStateException
506         */
507        private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
508    
509            RecordLocation pos = null;
510            int transactionCounter = 0;
511    
512            LOG.info("Journal Recovery Started from: " + journal);
513            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
514    
515            // While we have records in the journal.
516            while ((pos = journal.getNextRecordLocation(pos)) != null) {
517                Packet data = journal.read(pos);
518                DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
519    
520                if (c instanceof Message) {
521                    Message message = (Message)c;
522                    JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
523                    if (message.isInTransaction()) {
524                        transactionStore.addMessage(store, message, pos);
525                    } else {
526                        store.replayAddMessage(context, message);
527                        transactionCounter++;
528                    }
529                } else {
530                    switch (c.getDataStructureType()) {
531                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
532                        JournalQueueAck command = (JournalQueueAck)c;
533                        JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
534                        if (command.getMessageAck().isInTransaction()) {
535                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
536                        } else {
537                            store.replayRemoveMessage(context, command.getMessageAck());
538                            transactionCounter++;
539                        }
540                    }
541                        break;
542                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
543                        JournalTopicAck command = (JournalTopicAck)c;
544                        JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
545                        if (command.getTransactionId() != null) {
546                            transactionStore.acknowledge(store, command, pos);
547                        } else {
548                            store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
549                            transactionCounter++;
550                        }
551                    }
552                        break;
553                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
554                        JournalTransaction command = (JournalTransaction)c;
555                        try {
556                            // Try to replay the packet.
557                            switch (command.getType()) {
558                            case JournalTransaction.XA_PREPARE:
559                                transactionStore.replayPrepare(command.getTransactionId());
560                                break;
561                            case JournalTransaction.XA_COMMIT:
562                            case JournalTransaction.LOCAL_COMMIT:
563                                Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
564                                if (tx == null) {
565                                    break; // We may be trying to replay a commit
566                                }
567                                // that
568                                // was already committed.
569    
570                                // Replay the committed operations.
571                                tx.getOperations();
572                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
573                                    TxOperation op = (TxOperation)iter.next();
574                                    if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
575                                        op.store.replayAddMessage(context, (Message)op.data);
576                                    }
577                                    if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
578                                        op.store.replayRemoveMessage(context, (MessageAck)op.data);
579                                    }
580                                    if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
581                                        JournalTopicAck ack = (JournalTopicAck)op.data;
582                                        ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
583                                    }
584                                }
585                                transactionCounter++;
586                                break;
587                            case JournalTransaction.LOCAL_ROLLBACK:
588                            case JournalTransaction.XA_ROLLBACK:
589                                transactionStore.replayRollback(command.getTransactionId());
590                                break;
591                            default:
592                                throw new IOException("Invalid journal command type: " + command.getType());
593                            }
594                        } catch (IOException e) {
595                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
596                        }
597                    }
598                        break;
599                    case JournalTrace.DATA_STRUCTURE_TYPE:
600                        JournalTrace trace = (JournalTrace)c;
601                        LOG.debug("TRACE Entry: " + trace.getMessage());
602                        break;
603                    default:
604                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
605                    }
606                }
607            }
608    
609            RecordLocation location = writeTraceMessage("RECOVERED", true);
610            journal.setMark(location, true);
611    
612            LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
613        }
614    
615        private IOException createReadException(RecordLocation location, Exception e) {
616            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
617        }
618    
619        protected IOException createWriteException(DataStructure packet, Exception e) {
620            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
621        }
622    
623        protected IOException createWriteException(String command, Exception e) {
624            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
625        }
626    
627        protected IOException createRecoveryFailedException(Exception e) {
628            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
629        }
630    
631        /**
632         * @param command
633         * @param sync
634         * @return
635         * @throws IOException
636         */
637        public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
638            if (started.get()) {
639                try {
640                        return journal.write(toPacket(wireFormat.marshal(command)), sync);
641                } catch (IOException ioe) {
642                        LOG.error("Cannot write to the journal", ioe);
643                        brokerService.handleIOException(ioe);
644                        throw ioe;
645                }
646            }
647            throw new IOException("closed");
648        }
649    
650        private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
651            JournalTrace trace = new JournalTrace();
652            trace.setMessage(message);
653            return writeCommand(trace, sync);
654        }
655    
656        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
657            newPercentUsage = (newPercentUsage / 10) * 10;
658            oldPercentUsage = (oldPercentUsage / 10) * 10;
659            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
660                boolean sync = newPercentUsage >= 90;
661                checkpoint(sync, true);
662            }
663        }
664    
665        public JournalTransactionStore getTransactionStore() {
666            return transactionStore;
667        }
668    
669        public void deleteAllMessages() throws IOException {
670            try {
671                JournalTrace trace = new JournalTrace();
672                trace.setMessage("DELETED");
673                RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
674                journal.setMark(location, true);
675                LOG.info("Journal deleted: ");
676            } catch (IOException e) {
677                throw e;
678            } catch (Throwable e) {
679                throw IOExceptionSupport.create(e);
680            }
681            longTermPersistence.deleteAllMessages();
682        }
683    
684        public SystemUsage getUsageManager() {
685            return usageManager;
686        }
687    
688        public int getMaxCheckpointMessageAddSize() {
689            return maxCheckpointMessageAddSize;
690        }
691    
692        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
693            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
694        }
695    
696        public int getMaxCheckpointWorkers() {
697            return maxCheckpointWorkers;
698        }
699    
700        public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
701            this.maxCheckpointWorkers = maxCheckpointWorkers;
702        }
703    
704        public boolean isUseExternalMessageReferences() {
705            return false;
706        }
707    
708        public void setUseExternalMessageReferences(boolean enable) {
709            if (enable) {
710                throw new IllegalArgumentException("The journal does not support message references.");
711            }
712        }
713    
714        public Packet toPacket(ByteSequence sequence) {
715            return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
716        }
717    
718        public ByteSequence toByteSequence(Packet packet) {
719            org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
720            return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
721        }
722    
723        public void setBrokerName(String brokerName) {
724            longTermPersistence.setBrokerName(brokerName);
725        }
726    
727        @Override
728        public String toString() {
729            return "JournalPersistenceAdapator(" + longTermPersistence + ")";
730        }
731    
732        public void setDirectory(File dir) {
733        }
734        
735        public long size(){
736            return 0;
737        }
738    
739        public void setBrokerService(BrokerService brokerService) {
740            this.brokerService = brokerService;
741            PersistenceAdapter pa = getLongTermPersistence();
742            if( pa instanceof BrokerServiceAware ) {
743                ((BrokerServiceAware)pa).setBrokerService(brokerService);
744            }
745        }
746    
747        public long getLastProducerSequenceId(ProducerId id) {
748            return -1;
749        }
750    
751    }