001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.EOFException;
024    import java.io.File;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.ObjectInputStream;
028    import java.io.ObjectOutputStream;
029    import java.io.OutputStream;
030    import java.util.*;
031    import java.util.Map.Entry;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    import java.util.concurrent.atomic.AtomicLong;
034    import java.util.concurrent.locks.ReentrantReadWriteLock;
035    
036    import org.apache.activemq.ActiveMQMessageAuditNoSync;
037    import org.apache.activemq.broker.BrokerService;
038    import org.apache.activemq.broker.BrokerServiceAware;
039    import org.apache.activemq.command.ConnectionId;
040    import org.apache.activemq.command.LocalTransactionId;
041    import org.apache.activemq.command.MessageId;
042    import org.apache.activemq.command.SubscriptionInfo;
043    import org.apache.activemq.command.TransactionId;
044    import org.apache.activemq.command.XATransactionId;
045    import org.apache.activemq.protobuf.Buffer;
046    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
047    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
048    import org.apache.activemq.store.kahadb.data.KahaDestination;
049    import org.apache.activemq.store.kahadb.data.KahaEntryType;
050    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
051    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
052    import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
053    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
054    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
055    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
056    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
057    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
058    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
059    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
060    import org.apache.activemq.util.Callback;
061    import org.apache.activemq.util.IOHelper;
062    import org.apache.activemq.util.ServiceStopper;
063    import org.apache.activemq.util.ServiceSupport;
064    import org.slf4j.Logger;
065    import org.slf4j.LoggerFactory;
066    import org.apache.kahadb.index.BTreeIndex;
067    import org.apache.kahadb.index.BTreeVisitor;
068    import org.apache.kahadb.journal.DataFile;
069    import org.apache.kahadb.journal.Journal;
070    import org.apache.kahadb.journal.Location;
071    import org.apache.kahadb.page.Page;
072    import org.apache.kahadb.page.PageFile;
073    import org.apache.kahadb.page.Transaction;
074    import org.apache.kahadb.util.ByteSequence;
075    import org.apache.kahadb.util.DataByteArrayInputStream;
076    import org.apache.kahadb.util.DataByteArrayOutputStream;
077    import org.apache.kahadb.util.LockFile;
078    import org.apache.kahadb.util.LongMarshaller;
079    import org.apache.kahadb.util.Marshaller;
080    import org.apache.kahadb.util.Sequence;
081    import org.apache.kahadb.util.SequenceSet;
082    import org.apache.kahadb.util.StringMarshaller;
083    import org.apache.kahadb.util.VariableMarshaller;
084    
085    public class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
086            
087            protected BrokerService brokerService;
088    
089        public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
090        public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
091    
092        protected static final Buffer UNMATCHED;
093        static {
094            UNMATCHED = new Buffer(new byte[]{});
095        }
096        private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
097        private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
098    
099        static final int CLOSED_STATE = 1;
100        static final int OPEN_STATE = 2;
101        static final long NOT_ACKED = -1;
102        static final long UNMATCHED_SEQ = -2;
103    
104        static final int VERSION = 3;
105    
106    
107        protected class Metadata {
108            protected Page<Metadata> page;
109            protected int state;
110            protected BTreeIndex<String, StoredDestination> destinations;
111            protected Location lastUpdate;
112            protected Location firstInProgressTransactionLocation;
113            protected Location producerSequenceIdTrackerLocation = null;
114            protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
115            protected int version = VERSION;
116            public void read(DataInput is) throws IOException {
117                state = is.readInt();
118                destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
119                if (is.readBoolean()) {
120                    lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
121                } else {
122                    lastUpdate = null;
123                }
124                if (is.readBoolean()) {
125                    firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
126                } else {
127                    firstInProgressTransactionLocation = null;
128                }
129                try {
130                    if (is.readBoolean()) {
131                        producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
132                    } else {
133                        producerSequenceIdTrackerLocation = null;
134                    }
135                } catch (EOFException expectedOnUpgrade) {
136                }
137                try {
138                   version = is.readInt();
139                }catch (EOFException expectedOnUpgrade) {
140                    version=1;
141                }
142                LOG.info("KahaDB is version " + version);
143            }
144    
145            public void write(DataOutput os) throws IOException {
146                os.writeInt(state);
147                os.writeLong(destinations.getPageId());
148    
149                if (lastUpdate != null) {
150                    os.writeBoolean(true);
151                    LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
152                } else {
153                    os.writeBoolean(false);
154                }
155    
156                if (firstInProgressTransactionLocation != null) {
157                    os.writeBoolean(true);
158                    LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
159                } else {
160                    os.writeBoolean(false);
161                }
162                
163                if (producerSequenceIdTrackerLocation != null) {
164                    os.writeBoolean(true);
165                    LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
166                } else {
167                    os.writeBoolean(false);
168                }
169                os.writeInt(VERSION);
170            }
171        }
172    
173        class MetadataMarshaller extends VariableMarshaller<Metadata> {
174            public Metadata readPayload(DataInput dataIn) throws IOException {
175                Metadata rc = new Metadata();
176                rc.read(dataIn);
177                return rc;
178            }
179    
180            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
181                object.write(dataOut);
182            }
183        }
184    
185        protected PageFile pageFile;
186            protected Journal journal;
187            protected Metadata metadata = new Metadata();
188    
189        protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
190    
191        protected boolean failIfDatabaseIsLocked;
192    
193        protected boolean deleteAllMessages;
194        protected File directory = new File("KahaDB");
195        protected Thread checkpointThread;
196        protected boolean enableJournalDiskSyncs=true;
197        protected boolean archiveDataLogs;
198        protected File directoryArchive;
199        protected AtomicLong storeSize = new AtomicLong(0);
200        long checkpointInterval = 5*1000;
201        long cleanupInterval = 30*1000;
202        int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
203        int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
204        boolean enableIndexWriteAsync = false;
205        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
206        
207        
208        protected AtomicBoolean opened = new AtomicBoolean();
209        private LockFile lockFile;
210        private boolean ignoreMissingJournalfiles = false;
211        private int indexCacheSize = 10000;
212        private boolean checkForCorruptJournalFiles = false;
213        private boolean checksumJournalFiles = false;
214        private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
215        protected boolean forceRecoverIndex = false;
216        private final Object checkpointThreadLock = new Object();
217    
218        public MessageDatabase() {
219        }
220    
221        @Override
222        public void doStart() throws Exception {
223            load();
224        }
225    
226        @Override
227        public void doStop(ServiceStopper stopper) throws Exception {
228            unload();
229        }
230    
231            private void loadPageFile() throws IOException {
232                this.indexLock.writeLock().lock();
233                try {
234                        final PageFile pageFile = getPageFile();
235                pageFile.load();
236                pageFile.tx().execute(new Transaction.Closure<IOException>() {
237                    public void execute(Transaction tx) throws IOException {
238                        if (pageFile.getPageCount() == 0) {
239                            // First time this is created.. Initialize the metadata
240                            Page<Metadata> page = tx.allocate();
241                            assert page.getPageId() == 0;
242                            page.set(metadata);
243                            metadata.page = page;
244                            metadata.state = CLOSED_STATE;
245                            metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
246    
247                            tx.store(metadata.page, metadataMarshaller, true);
248                        } else {
249                            Page<Metadata> page = tx.load(0, metadataMarshaller);
250                            metadata = page.get();
251                            metadata.page = page;
252                        }
253                        metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
254                        metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
255                        metadata.destinations.load(tx);
256                    }
257                });
258                // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
259                // Perhaps we should just keep an index of file
260                storedDestinations.clear();
261                pageFile.tx().execute(new Transaction.Closure<IOException>() {
262                    public void execute(Transaction tx) throws IOException {
263                        for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
264                            Entry<String, StoredDestination> entry = iterator.next();
265                            StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
266                            storedDestinations.put(entry.getKey(), sd);
267                        }
268                    }
269                });
270                pageFile.flush();            
271            }finally {
272                this.indexLock.writeLock().unlock();
273            }
274            }
275            
276            private void startCheckpoint() {
277            synchronized (checkpointThreadLock) {
278                boolean start = false;
279                if (checkpointThread == null) {
280                    start = true;
281                } else if (!checkpointThread.isAlive()) {
282                    start = true;
283                    LOG.info("KahaDB: Recovering checkpoint thread after death");
284                }
285                if (start) {
286                    checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
287                        @Override
288                        public void run() {
289                            try {
290                                long lastCleanup = System.currentTimeMillis();
291                                long lastCheckpoint = System.currentTimeMillis();
292                                // Sleep for a short time so we can periodically check
293                                // to see if we need to exit this thread.
294                                long sleepTime = Math.min(checkpointInterval, 500);
295                                while (opened.get()) {
296                                    Thread.sleep(sleepTime);
297                                    long now = System.currentTimeMillis();
298                                    if( now - lastCleanup >= cleanupInterval ) {
299                                        checkpointCleanup(true);
300                                        lastCleanup = now;
301                                        lastCheckpoint = now;
302                                    } else if( now - lastCheckpoint >= checkpointInterval ) {
303                                        checkpointCleanup(false);
304                                        lastCheckpoint = now;
305                                    }
306                                }
307                            } catch (InterruptedException e) {
308                                // Looks like someone really wants us to exit this thread...
309                            } catch (IOException ioe) {
310                                LOG.error("Checkpoint failed", ioe);
311                                brokerService.handleIOException(ioe);
312                            }
313                        }
314                    };
315    
316                    checkpointThread.setDaemon(true);
317                    checkpointThread.start();
318                }
319            }
320            }
321    
322            public void open() throws IOException {
323                    if( opened.compareAndSet(false, true) ) {
324                getJournal().start();
325                    loadPageFile();        
326                    startCheckpoint();
327                recover();
328                    }
329            }
330    
331        private void lock() throws IOException {
332            if( lockFile == null ) {
333                File lockFileName = new File(directory, "lock");
334                lockFile = new LockFile(lockFileName, true);
335                if (failIfDatabaseIsLocked) {
336                    lockFile.lock();
337                } else {
338                    while (true) {
339                        try {
340                            lockFile.lock();
341                            break;
342                        } catch (IOException e) {
343                            LOG.info("Database "+lockFileName+" is locked... waiting " + (getDatabaseLockedWaitDelay() / 1000) + " seconds for the database to be unlocked. Reason: " + e);
344                            try {
345                                Thread.sleep(getDatabaseLockedWaitDelay());
346                            } catch (InterruptedException e1) {
347                            }
348                        }
349                    }
350                }
351            }
352        }
353    
354        // for testing
355        public LockFile getLockFile() {
356            return lockFile;
357        }
358    
359        public void load() throws IOException {
360            
361            this.indexLock.writeLock().lock();
362            try {
363                lock();
364                if (deleteAllMessages) {
365                    getJournal().start();
366                    getJournal().delete();
367                    getJournal().close();
368                    journal = null;
369                    getPageFile().delete();
370                    LOG.info("Persistence store purged.");
371                    deleteAllMessages = false;
372                }
373    
374                    open();
375                    store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
376            }finally {
377                this.indexLock.writeLock().unlock();
378            }
379    
380        }
381    
382        
383            public void close() throws IOException, InterruptedException {
384                    if( opened.compareAndSet(true, false)) {
385                        this.indexLock.writeLock().lock();
386                    try {
387                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
388                            public void execute(Transaction tx) throws IOException {
389                                checkpointUpdate(tx, true);
390                            }
391                        });
392                        pageFile.unload();
393                        metadata = new Metadata();
394                    }finally {
395                        this.indexLock.writeLock().unlock();
396                    }
397                    journal.close();
398                synchronized (checkpointThreadLock) {
399                        checkpointThread.join();
400                }
401                    lockFile.unlock();
402                    lockFile=null;
403                    }
404            }
405            
406        public void unload() throws IOException, InterruptedException {
407            this.indexLock.writeLock().lock();
408            try {
409                if( pageFile != null && pageFile.isLoaded() ) {
410                    metadata.state = CLOSED_STATE;
411                    metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
412        
413                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
414                        public void execute(Transaction tx) throws IOException {
415                            tx.store(metadata.page, metadataMarshaller, true);
416                        }
417                    });
418                }
419            }finally {
420                this.indexLock.writeLock().unlock();
421            }
422            close();
423        }
424    
425        // public for testing
426        public Location getFirstInProgressTxLocation() {
427            Location l = null;
428            synchronized (inflightTransactions) {
429                if (!inflightTransactions.isEmpty()) {
430                    l = inflightTransactions.values().iterator().next().get(0).getLocation();
431                }
432                if (!preparedTransactions.isEmpty()) {
433                    Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
434                    if (l==null || t.compareTo(l) <= 0) {
435                        l = t;
436                    }
437                }
438            }
439            return l;
440        }
441    
442        /**
443         * Move all the messages that were in the journal into long term storage. We
444         * just replay and do a checkpoint.
445         * 
446         * @throws IOException
447         * @throws IOException
448         * @throws IllegalStateException
449         */
450        private void recover() throws IllegalStateException, IOException {
451            this.indexLock.writeLock().lock();
452            try {
453                
454                    long start = System.currentTimeMillis();        
455                    Location producerAuditPosition = recoverProducerAudit();
456                    Location lastIndoubtPosition = getRecoveryPosition();
457                    
458                    Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
459                        
460                    if (recoveryPosition != null) {  
461                        int redoCounter = 0;
462                        LOG.info("Recovering from the journal ...");
463                        while (recoveryPosition != null) {
464                            JournalCommand<?> message = load(recoveryPosition);
465                            metadata.lastUpdate = recoveryPosition;
466                            process(message, recoveryPosition, lastIndoubtPosition);
467                            redoCounter++;
468                            recoveryPosition = journal.getNextLocation(recoveryPosition);
469                        }
470                        long end = System.currentTimeMillis();
471                        LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
472                    }
473                    
474                    // We may have to undo some index updates.
475                pageFile.tx().execute(new Transaction.Closure<IOException>() {
476                    public void execute(Transaction tx) throws IOException {
477                        recoverIndex(tx);
478                    }
479                });
480    
481                // rollback any recovered inflight local transactions
482                Set<TransactionId> toRollback = new HashSet<TransactionId>();
483                synchronized (inflightTransactions) {
484                    for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
485                        TransactionId id = it.next();
486                        if (id.isLocalTransaction()) {
487                            toRollback.add(id);
488                        }
489                    }
490                    for (TransactionId tx: toRollback) {
491                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
492                        store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(tx)), false, null, null);
493                    }
494                }
495            }finally {
496                this.indexLock.writeLock().unlock();
497            }
498        }
499        
500            private Location minimum(Location producerAuditPosition,
501                Location lastIndoubtPosition) {
502                Location min = null;
503                if (producerAuditPosition != null) {
504                    min = producerAuditPosition;
505                    if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
506                        min = lastIndoubtPosition;
507                    }
508                } else {
509                    min = lastIndoubtPosition;
510                }
511                return min;
512        }
513            
514            private Location recoverProducerAudit() throws IOException {
515                if (metadata.producerSequenceIdTrackerLocation != null) {
516                    KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
517                    try {
518                        ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
519                        metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
520                    } catch (ClassNotFoundException cfe) {
521                        IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
522                        ioe.initCause(cfe);
523                        throw ioe;
524                    }
525                    return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
526                } else {
527                    // got no audit stored so got to recreate via replay from start of the journal
528                    return journal.getNextLocation(null);
529                }
530        }
531    
532        protected void recoverIndex(Transaction tx) throws IOException {
533            long start = System.currentTimeMillis();
534            // It is possible index updates got applied before the journal updates.. 
535            // in that case we need to removed references to messages that are not in the journal
536            final Location lastAppendLocation = journal.getLastAppendLocation();
537            long undoCounter=0;
538            
539            // Go through all the destinations to see if they have messages past the lastAppendLocation
540            for (StoredDestination sd : storedDestinations.values()) {
541                    
542                final ArrayList<Long> matches = new ArrayList<Long>();
543                // Find all the Locations that are >= than the last Append Location.
544                sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
545                                    @Override
546                                    protected void matched(Location key, Long value) {
547                                            matches.add(value);
548                                    }
549                });
550                
551                
552                for (Long sequenceId : matches) {
553                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
554                    sd.locationIndex.remove(tx, keys.location);
555                    sd.messageIdIndex.remove(tx, keys.messageId);
556                    metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
557                    undoCounter++;
558                    // TODO: do we need to modify the ack positions for the pub sub case?
559                            }
560            }
561    
562            long end = System.currentTimeMillis();
563            if( undoCounter > 0 ) {
564                    // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
565                    // should do sync writes to the journal.
566                    LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
567            }
568    
569            undoCounter = 0;
570            start = System.currentTimeMillis();
571    
572            // Lets be extra paranoid here and verify that all the datafiles being referenced
573            // by the indexes still exists.
574    
575            final SequenceSet ss = new SequenceSet();
576            for (StoredDestination sd : storedDestinations.values()) {
577                // Use a visitor to cut down the number of pages that we load
578                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
579                    int last=-1;
580    
581                    public boolean isInterestedInKeysBetween(Location first, Location second) {
582                        if( first==null ) {
583                            return !ss.contains(0, second.getDataFileId());
584                        } else if( second==null ) {
585                            return true;
586                        } else {
587                            return !ss.contains(first.getDataFileId(), second.getDataFileId());
588                        }
589                    }
590    
591                    public void visit(List<Location> keys, List<Long> values) {
592                        for (Location l : keys) {
593                            int fileId = l.getDataFileId();
594                            if( last != fileId ) {
595                                ss.add(fileId);
596                                last = fileId;
597                            }
598                        }
599                    }
600    
601                });
602            }
603            HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
604            while( !ss.isEmpty() ) {
605                missingJournalFiles.add( (int)ss.removeFirst() );
606            }
607            missingJournalFiles.removeAll( journal.getFileMap().keySet() );
608    
609            if( !missingJournalFiles.isEmpty() ) {
610                LOG.info("Some journal files are missing: "+missingJournalFiles);
611            }
612    
613            ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
614            for (Integer missing : missingJournalFiles) {
615                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
616            }
617    
618            if ( checkForCorruptJournalFiles ) {
619                Collection<DataFile> dataFiles = journal.getFileMap().values();
620                for (DataFile dataFile : dataFiles) {
621                    int id = dataFile.getDataFileId();
622                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
623                    Sequence seq = dataFile.getCorruptedBlocks().getHead();
624                    while( seq!=null ) {
625                        missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
626                        seq = seq.getNext();
627                    }
628                }
629            }
630    
631            if( !missingPredicates.isEmpty() ) {
632                for (StoredDestination sd : storedDestinations.values()) {
633    
634                    final ArrayList<Long> matches = new ArrayList<Long>();
635                    sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
636                        @Override
637                        protected void matched(Location key, Long value) {
638                            matches.add(value);
639                        }
640                    });
641    
642                    // If somes message references are affected by the missing data files...
643                    if( !matches.isEmpty() ) {
644    
645                        // We either 'gracefully' recover dropping the missing messages or
646                        // we error out.
647                        if( ignoreMissingJournalfiles ) {
648                            // Update the index to remove the references to the missing data
649                            for (Long sequenceId : matches) {
650                                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
651                                sd.locationIndex.remove(tx, keys.location);
652                                sd.messageIdIndex.remove(tx, keys.messageId);
653                                undoCounter++;
654                                // TODO: do we need to modify the ack positions for the pub sub case?
655                            }
656    
657                        } else {
658                            throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
659                        }
660                    }
661                }
662            }
663            
664            end = System.currentTimeMillis();
665            if( undoCounter > 0 ) {
666                    // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
667                    // should do sync writes to the journal.
668                    LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
669            }
670            }
671    
672            private Location nextRecoveryPosition;
673            private Location lastRecoveryPosition;
674    
675            public void incrementalRecover() throws IOException {
676                this.indexLock.writeLock().lock();
677            try {
678                    if( nextRecoveryPosition == null ) {
679                            if( lastRecoveryPosition==null ) {
680                                    nextRecoveryPosition = getRecoveryPosition();
681                            } else {
682                            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
683                            }               
684                    }
685                    while (nextRecoveryPosition != null) {
686                            lastRecoveryPosition = nextRecoveryPosition;
687                        metadata.lastUpdate = lastRecoveryPosition;
688                        JournalCommand<?> message = load(lastRecoveryPosition);
689                        process(message, lastRecoveryPosition);            
690                        nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
691                    }
692            }finally {
693                this.indexLock.writeLock().unlock();
694            }
695            }
696            
697        public Location getLastUpdatePosition() throws IOException {
698            return metadata.lastUpdate;
699        }
700        
701        private Location getRecoveryPosition() throws IOException {
702    
703            if (!this.forceRecoverIndex) {
704    
705                // If we need to recover the transactions..
706                if (metadata.firstInProgressTransactionLocation != null) {
707                    return metadata.firstInProgressTransactionLocation;
708                }
709            
710                // Perhaps there were no transactions...
711                if( metadata.lastUpdate!=null) {
712                    // Start replay at the record after the last one recorded in the index file.
713                    return journal.getNextLocation(metadata.lastUpdate);
714                }
715            }
716            // This loads the first position.
717            return journal.getNextLocation(null);
718            }
719    
720        protected void checkpointCleanup(final boolean cleanup) throws IOException {
721            long start;
722            this.indexLock.writeLock().lock();
723            try {
724                start = System.currentTimeMillis();
725                    if( !opened.get() ) {
726                            return;
727                    }
728                pageFile.tx().execute(new Transaction.Closure<IOException>() {
729                    public void execute(Transaction tx) throws IOException {
730                        checkpointUpdate(tx, cleanup);
731                    }
732                });
733            }finally {
734                this.indexLock.writeLock().unlock();
735            }
736            long end = System.currentTimeMillis();
737            if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
738                    LOG.info("Slow KahaDB access: cleanup took "+(end-start));
739            }
740        }
741    
742        
743            public void checkpoint(Callback closure) throws Exception {
744                this.indexLock.writeLock().lock();
745            try {
746                pageFile.tx().execute(new Transaction.Closure<IOException>() {
747                    public void execute(Transaction tx) throws IOException {
748                        checkpointUpdate(tx, false);
749                    }
750                });
751                closure.execute();
752            }finally {
753                this.indexLock.writeLock().unlock();
754            }
755            }
756    
757        // /////////////////////////////////////////////////////////////////
758        // Methods call by the broker to update and query the store.
759        // /////////////////////////////////////////////////////////////////
760        public Location store(JournalCommand<?> data) throws IOException {
761            return store(data, false, null,null);
762        }
763    
764        /**
765         * All updated are are funneled through this method. The updates are converted
766         * to a JournalMessage which is logged to the journal and then the data from
767         * the JournalMessage is used to update the index just like it would be done
768         * during a recovery process.
769         */
770        public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
771            if (before != null) {
772                before.run();
773            }
774            try {
775                int size = data.serializedSizeFramed();
776                DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
777                os.writeByte(data.type().getNumber());
778                data.writeFramed(os);
779        
780                long start = System.currentTimeMillis();
781                Location location = journal.write(os.toByteSequence(), sync);
782                long start2 = System.currentTimeMillis();
783                process(data, location);
784                    long end = System.currentTimeMillis();
785                    if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
786                            LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
787                    }
788        
789                    this.indexLock.writeLock().lock();
790                try {
791                    metadata.lastUpdate = location;
792                }finally {
793                    this.indexLock.writeLock().unlock();
794                }
795                if (!checkpointThread.isAlive()) {
796                    startCheckpoint();
797                }
798                if (after != null) {
799                    after.run();
800                }
801                return location;
802            } catch (IOException ioe) {
803                LOG.error("KahaDB failed to store to Journal", ioe);
804                brokerService.handleIOException(ioe);
805                throw ioe;
806            }
807        }
808    
809        /**
810         * Loads a previously stored JournalMessage
811         * 
812         * @param location
813         * @return
814         * @throws IOException
815         */
816        public JournalCommand<?> load(Location location) throws IOException {
817            ByteSequence data = journal.read(location);
818            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
819            byte readByte = is.readByte();
820            KahaEntryType type = KahaEntryType.valueOf(readByte);
821            if( type == null ) {
822                throw new IOException("Could not load journal record. Invalid location: "+location);
823            }
824            JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
825            message.mergeFramed(is);
826            return message;
827        }
828        
829        /**
830         * do minimal recovery till we reach the last inDoubtLocation
831         * @param data
832         * @param location
833         * @param inDoubtlocation
834         * @throws IOException
835         */
836        void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
837            if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
838                process(data, location);
839            } else {
840                // just recover producer audit
841                data.visit(new Visitor() {
842                    public void visit(KahaAddMessageCommand command) throws IOException {
843                        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
844                    }
845                });
846            }
847        }
848    
849        // /////////////////////////////////////////////////////////////////
850        // Journaled record processing methods. Once the record is journaled,
851        // these methods handle applying the index updates. These may be called
852        // from the recovery method too so they need to be idempotent
853        // /////////////////////////////////////////////////////////////////
854    
855        void process(JournalCommand<?> data, final Location location) throws IOException {
856            data.visit(new Visitor() {
857                @Override
858                public void visit(KahaAddMessageCommand command) throws IOException {
859                    process(command, location);
860                }
861    
862                @Override
863                public void visit(KahaRemoveMessageCommand command) throws IOException {
864                    process(command, location);
865                }
866    
867                @Override
868                public void visit(KahaPrepareCommand command) throws IOException {
869                    process(command, location);
870                }
871    
872                @Override
873                public void visit(KahaCommitCommand command) throws IOException {
874                    process(command, location);
875                }
876    
877                @Override
878                public void visit(KahaRollbackCommand command) throws IOException {
879                    process(command, location);
880                }
881    
882                @Override
883                public void visit(KahaRemoveDestinationCommand command) throws IOException {
884                    process(command, location);
885                }
886    
887                @Override
888                public void visit(KahaSubscriptionCommand command) throws IOException {
889                    process(command, location);
890                }
891            });
892        }
893    
894        protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
895            if (command.hasTransactionInfo()) {
896                List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
897                inflightTx.add(new AddOpperation(command, location));
898            } else {
899                this.indexLock.writeLock().lock();
900                try {
901                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
902                        public void execute(Transaction tx) throws IOException {
903                            upadateIndex(tx, command, location);
904                        }
905                    });
906                }finally {
907                    this.indexLock.writeLock().unlock();
908                }
909            }
910        }
911    
912        protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
913            if (command.hasTransactionInfo()) {
914               List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
915               inflightTx.add(new RemoveOpperation(command, location));
916            } else {
917                this.indexLock.writeLock().lock();
918                try {
919                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
920                        public void execute(Transaction tx) throws IOException {
921                            updateIndex(tx, command, location);
922                        }
923                    });
924                }finally {
925                    this.indexLock.writeLock().unlock();
926                }
927            }
928    
929        }
930    
931        protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
932            this.indexLock.writeLock().lock();
933            try {
934                pageFile.tx().execute(new Transaction.Closure<IOException>() {
935                    public void execute(Transaction tx) throws IOException {
936                        updateIndex(tx, command, location);
937                    }
938                });
939            }finally {
940                this.indexLock.writeLock().unlock();
941            }
942        }
943    
944        protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
945            this.indexLock.writeLock().lock();
946            try {
947                pageFile.tx().execute(new Transaction.Closure<IOException>() {
948                    public void execute(Transaction tx) throws IOException {
949                        updateIndex(tx, command, location);
950                    }
951                });
952            }finally {
953                this.indexLock.writeLock().unlock();
954            }
955        }
956    
957        protected void process(KahaCommitCommand command, Location location) throws IOException {
958            TransactionId key = key(command.getTransactionInfo());
959            List<Operation> inflightTx;
960            synchronized (inflightTransactions) {
961                inflightTx = inflightTransactions.remove(key);
962                if (inflightTx == null) {
963                    inflightTx = preparedTransactions.remove(key);
964                }
965            }
966            if (inflightTx == null) {
967                return;
968            }
969    
970            final List<Operation> messagingTx = inflightTx;
971            this.indexLock.writeLock().lock();
972            try {
973                pageFile.tx().execute(new Transaction.Closure<IOException>() {
974                    public void execute(Transaction tx) throws IOException {
975                        for (Operation op : messagingTx) {
976                            op.execute(tx);
977                        }
978                    }
979                });
980            }finally {
981                this.indexLock.writeLock().unlock();
982            }
983        }
984    
985        protected void process(KahaPrepareCommand command, Location location) {
986            TransactionId key = key(command.getTransactionInfo());
987            synchronized (inflightTransactions) {
988                List<Operation> tx = inflightTransactions.remove(key);
989                if (tx != null) {
990                    preparedTransactions.put(key, tx);
991                }
992            }
993        }
994    
995        protected void process(KahaRollbackCommand command, Location location) {
996            TransactionId key = key(command.getTransactionInfo());
997            synchronized (inflightTransactions) {
998                List<Operation> tx = inflightTransactions.remove(key);
999                if (tx == null) {
1000                    preparedTransactions.remove(key);
1001                }
1002            }
1003        }
1004    
1005        // /////////////////////////////////////////////////////////////////
1006        // These methods do the actual index updates.
1007        // /////////////////////////////////////////////////////////////////
1008    
1009        protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1010            private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1011    
1012        void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1013            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1014    
1015            // Skip adding the message to the index if this is a topic and there are
1016            // no subscriptions.
1017            if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1018                return;
1019            }
1020    
1021            // Add the message.
1022            int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1023            long id = sd.orderIndex.getNextMessageId(priority);
1024            Long previous = sd.locationIndex.put(tx, location, id);
1025            if (previous == null) {
1026                previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1027                if (previous == null) {
1028                    sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1029                    if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1030                        addAckLocationForNewMessage(tx, sd, id);
1031                    }
1032                } else {
1033                    // If the message ID as indexed, then the broker asked us to
1034                    // store a DUP
1035                    // message. Bad BOY! Don't do it, and log a warning.
1036                    LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
1037                    // TODO: consider just rolling back the tx.
1038                    sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1039                    sd.locationIndex.remove(tx, location);
1040                }
1041            } else {
1042                // restore the previous value.. Looks like this was a redo of a
1043                // previously
1044                // added message. We don't want to assign it a new id as the other
1045                // indexes would
1046                // be wrong..
1047                //
1048                // TODO: consider just rolling back the tx.
1049                sd.locationIndex.put(tx, location, previous);
1050            }
1051            // record this id in any event, initial send or recovery
1052            metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1053        }
1054    
1055        void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1056            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1057            if (!command.hasSubscriptionKey()) {
1058                
1059                // In the queue case we just remove the message from the index..
1060                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1061                if (sequenceId != null) {
1062                    MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1063                    if (keys != null) {
1064                        sd.locationIndex.remove(tx, keys.location);
1065                        recordAckMessageReferenceLocation(ackLocation, keys.location);
1066                    }                
1067                }
1068            } else {
1069                // In the topic case we need remove the message once it's been acked
1070                // by all the subs
1071                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1072    
1073                // Make sure it's a valid message id...
1074                if (sequence != null) {
1075                    String subscriptionKey = command.getSubscriptionKey();
1076                    if (command.getAck() != UNMATCHED) {
1077                        sd.orderIndex.get(tx, sequence);
1078                        byte priority = sd.orderIndex.lastGetPriority();
1079                        sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1080                    }
1081                    // The following method handles deleting un-referenced messages.
1082                    removeAckLocation(tx, sd, subscriptionKey, sequence);
1083                }
1084    
1085            }
1086        }
1087    
1088        Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
1089        private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1090            Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1091            if (referenceFileIds == null) {
1092                referenceFileIds = new HashSet<Integer>();
1093                referenceFileIds.add(messageLocation.getDataFileId());
1094                ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1095            } else {
1096                Integer id = Integer.valueOf(messageLocation.getDataFileId());
1097                if (!referenceFileIds.contains(id)) {
1098                    referenceFileIds.add(id);
1099                }
1100            }
1101        }
1102    
1103        void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1104            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1105            sd.orderIndex.remove(tx);
1106            
1107            sd.locationIndex.clear(tx);
1108            sd.locationIndex.unload(tx);
1109            tx.free(sd.locationIndex.getPageId());
1110    
1111            sd.messageIdIndex.clear(tx);
1112            sd.messageIdIndex.unload(tx);
1113            tx.free(sd.messageIdIndex.getPageId());
1114    
1115            if (sd.subscriptions != null) {
1116                sd.subscriptions.clear(tx);
1117                sd.subscriptions.unload(tx);
1118                tx.free(sd.subscriptions.getPageId());
1119    
1120                sd.subscriptionAcks.clear(tx);
1121                sd.subscriptionAcks.unload(tx);
1122                tx.free(sd.subscriptionAcks.getPageId());
1123    
1124                sd.ackPositions.clear(tx);
1125                sd.ackPositions.unload(tx);
1126                tx.free(sd.ackPositions.getPageId());
1127            }
1128    
1129            String key = key(command.getDestination());
1130            storedDestinations.remove(key);
1131            metadata.destinations.remove(tx, key);
1132        }
1133    
1134        void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1135            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1136            final String subscriptionKey = command.getSubscriptionKey();
1137    
1138            // If set then we are creating it.. otherwise we are destroying the sub
1139            if (command.hasSubscriptionInfo()) {
1140                sd.subscriptions.put(tx, subscriptionKey, command);
1141                long ackLocation=NOT_ACKED;
1142                if (!command.getRetroactive()) {
1143                    ackLocation = sd.orderIndex.nextMessageId-1;
1144                } else {
1145                    addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
1146                }
1147                sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1148            } else {
1149                // delete the sub...
1150                sd.subscriptions.remove(tx, subscriptionKey);
1151                sd.subscriptionAcks.remove(tx, subscriptionKey);
1152                removeAckLocationsForSub(tx, sd, subscriptionKey);
1153            }
1154        }
1155        
1156        /**
1157         * @param tx
1158         * @throws IOException
1159         */
1160        void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1161            LOG.debug("Checkpoint started.");
1162    
1163            // reflect last update exclusive of current checkpoint
1164            Location firstTxLocation = metadata.lastUpdate;
1165    
1166            metadata.state = OPEN_STATE;
1167            metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1168            metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
1169            tx.store(metadata.page, metadataMarshaller, true);
1170            pageFile.flush();
1171    
1172            if( cleanup ) {
1173    
1174                final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1175                final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
1176    
1177                LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
1178    
1179                    // Don't GC files under replication
1180                    if( journalFilesBeingReplicated!=null ) {
1181                            gcCandidateSet.removeAll(journalFilesBeingReplicated);
1182                    }
1183    
1184                // Don't GC files after the first in progress tx
1185                if( metadata.firstInProgressTransactionLocation!=null ) {
1186                    if (metadata.firstInProgressTransactionLocation.getDataFileId() < firstTxLocation.getDataFileId()) {
1187                       firstTxLocation = metadata.firstInProgressTransactionLocation;
1188                    };
1189                }
1190                
1191                if( firstTxLocation!=null ) {
1192                    while( !gcCandidateSet.isEmpty() ) {
1193                            Integer last = gcCandidateSet.last();
1194                            if( last >= firstTxLocation.getDataFileId() ) {
1195                                    gcCandidateSet.remove(last);
1196                            } else {
1197                                    break;
1198                            }
1199                    }
1200                    LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
1201                }
1202    
1203                // Go through all the destinations to see if any of them can remove GC candidates.
1204                for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1205                    if( gcCandidateSet.isEmpty() ) {
1206                            break;
1207                    }
1208    
1209                    // Use a visitor to cut down the number of pages that we load
1210                    entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1211                        int last=-1;
1212                        public boolean isInterestedInKeysBetween(Location first, Location second) {
1213                            if( first==null ) {
1214                                    SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1215                                    if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1216                                            subset.remove(second.getDataFileId());
1217                                    }
1218                                                            return !subset.isEmpty();
1219                            } else if( second==null ) {
1220                                    SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1221                                    if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1222                                            subset.remove(first.getDataFileId());
1223                                    }
1224                                                            return !subset.isEmpty();
1225                            } else {
1226                                    SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1227                                    if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1228                                            subset.remove(first.getDataFileId());
1229                                    }
1230                                    if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1231                                            subset.remove(second.getDataFileId());
1232                                    }
1233                                                            return !subset.isEmpty();
1234                            }
1235                        }
1236    
1237                        public void visit(List<Location> keys, List<Long> values) {
1238                            for (Location l : keys) {
1239                                int fileId = l.getDataFileId();
1240                                                            if( last != fileId ) {
1241                                            gcCandidateSet.remove(fileId);
1242                                    last = fileId;
1243                                }
1244                            }
1245                        }
1246                    });
1247                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1248                }
1249    
1250                // check we are not deleting file with ack for in-use journal files
1251                LOG.trace("gc candidates: " + gcCandidateSet);
1252                final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
1253                Iterator<Integer> candidates = gcCandidateSet.iterator();
1254                while (candidates.hasNext()) {
1255                    Integer candidate = candidates.next();
1256                    Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
1257                    if (referencedFileIds != null) {
1258                        for (Integer referencedFileId : referencedFileIds) {
1259                            if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
1260                                // active file that is not targeted for deletion is referenced so don't delete
1261                                candidates.remove();
1262                                break;
1263                            }
1264                        }
1265                        if (gcCandidateSet.contains(candidate)) {
1266                            ackMessageFileMap.remove(candidate);
1267                        } else {
1268                            LOG.trace("not removing data file: " + candidate
1269                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1270                        }
1271                    }
1272                }
1273    
1274                if( !gcCandidateSet.isEmpty() ) {
1275                        LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
1276                        journal.removeDataFiles(gcCandidateSet);
1277                }
1278            }
1279            
1280            LOG.debug("Checkpoint done.");
1281        }
1282        
1283        private Location checkpointProducerAudit() throws IOException {
1284            ByteArrayOutputStream baos = new ByteArrayOutputStream();
1285            ObjectOutputStream oout = new ObjectOutputStream(baos);
1286            oout.writeObject(metadata.producerSequenceIdTracker);
1287            oout.flush();
1288            oout.close();
1289            return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null);
1290        }
1291    
1292        public HashSet<Integer> getJournalFilesBeingReplicated() {
1293                    return journalFilesBeingReplicated;
1294            }
1295    
1296        // /////////////////////////////////////////////////////////////////
1297        // StoredDestination related implementation methods.
1298        // /////////////////////////////////////////////////////////////////
1299    
1300    
1301            private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
1302    
1303        class StoredSubscription {
1304            SubscriptionInfo subscriptionInfo;
1305            String lastAckId;
1306            Location lastAckLocation;
1307            Location cursor;
1308        }
1309        
1310        static class MessageKeys {
1311            final String messageId;
1312            final Location location;
1313            
1314            public MessageKeys(String messageId, Location location) {
1315                this.messageId=messageId;
1316                this.location=location;
1317            }
1318            
1319            @Override
1320            public String toString() {
1321                return "["+messageId+","+location+"]";
1322            }
1323        }
1324        
1325        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
1326            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
1327            
1328            public MessageKeys readPayload(DataInput dataIn) throws IOException {
1329                return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
1330            }
1331    
1332            public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
1333                dataOut.writeUTF(object.messageId);
1334                LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
1335            }
1336        }
1337    
1338        class LastAck {
1339            long lastAckedSequence;
1340            byte priority;
1341    
1342            public LastAck(LastAck source) {
1343                this.lastAckedSequence = source.lastAckedSequence;
1344                this.priority = source.priority;
1345            }
1346    
1347            public LastAck() {
1348                this.priority = MessageOrderIndex.HI;
1349            }
1350    
1351            public LastAck(long ackLocation) {
1352                this.lastAckedSequence = ackLocation;
1353                this.priority = MessageOrderIndex.LO;
1354            }
1355    
1356            public LastAck(long ackLocation, byte priority) {
1357                this.lastAckedSequence = ackLocation;
1358                this.priority = priority;
1359            }
1360    
1361            public String toString() {
1362                return "[" + lastAckedSequence + ":" + priority + "]";
1363            }
1364        }
1365    
1366        protected class LastAckMarshaller implements Marshaller<LastAck> {
1367            
1368            public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
1369                dataOut.writeLong(object.lastAckedSequence);
1370                dataOut.writeByte(object.priority);
1371            }
1372    
1373            public LastAck readPayload(DataInput dataIn) throws IOException {
1374                LastAck lastAcked = new LastAck();
1375                lastAcked.lastAckedSequence = dataIn.readLong();
1376                if (metadata.version >= 3) {
1377                    lastAcked.priority = dataIn.readByte();
1378                }
1379                return lastAcked;
1380            }
1381    
1382            public int getFixedSize() {
1383                return 9;
1384            }
1385    
1386            public LastAck deepCopy(LastAck source) {
1387                return new LastAck(source);
1388            }
1389    
1390            public boolean isDeepCopySupported() {
1391                return true;
1392            }
1393        }
1394    
1395        class StoredDestination {
1396            
1397            MessageOrderIndex orderIndex = new MessageOrderIndex();
1398            BTreeIndex<Location, Long> locationIndex;
1399            BTreeIndex<String, Long> messageIdIndex;
1400    
1401            // These bits are only set for Topics
1402            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
1403            BTreeIndex<String, LastAck> subscriptionAcks;
1404            HashMap<String, MessageOrderCursor> subscriptionCursors;
1405            BTreeIndex<Long, HashSet<String>> ackPositions;
1406        }
1407    
1408        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
1409    
1410            public StoredDestination readPayload(DataInput dataIn) throws IOException {
1411                final StoredDestination value = new StoredDestination();
1412                value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1413                value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
1414                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
1415    
1416                if (dataIn.readBoolean()) {
1417                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
1418                    value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
1419                    if (metadata.version >= 3) {
1420                        value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
1421                    } else {
1422                        // upgrade
1423                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1424                            public void execute(Transaction tx) throws IOException {
1425                                value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
1426                                value.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1427                                value.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1428                                value.ackPositions.load(tx);
1429                            }
1430                        });
1431                    }
1432                }
1433                if (metadata.version >= 2) {
1434                    value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1435                    value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
1436                } else {
1437                        // upgrade
1438                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
1439                            public void execute(Transaction tx) throws IOException {
1440                                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1441                                value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1442                                value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1443                                value.orderIndex.lowPriorityIndex.load(tx);
1444    
1445                                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
1446                                value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
1447                                value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
1448                                value.orderIndex.highPriorityIndex.load(tx);
1449                            }
1450                        });
1451                }
1452    
1453                return value;
1454            }
1455    
1456            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
1457                dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
1458                dataOut.writeLong(value.locationIndex.getPageId());
1459                dataOut.writeLong(value.messageIdIndex.getPageId());
1460                if (value.subscriptions != null) {
1461                    dataOut.writeBoolean(true);
1462                    dataOut.writeLong(value.subscriptions.getPageId());
1463                    dataOut.writeLong(value.subscriptionAcks.getPageId());
1464                    dataOut.writeLong(value.ackPositions.getPageId());
1465                } else {
1466                    dataOut.writeBoolean(false);
1467                }
1468                dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
1469                dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
1470            }
1471        }
1472    
1473        static class LocationMarshaller implements Marshaller<Location> {
1474            final static LocationMarshaller INSTANCE = new LocationMarshaller();
1475    
1476            public Location readPayload(DataInput dataIn) throws IOException {
1477                Location rc = new Location();
1478                rc.setDataFileId(dataIn.readInt());
1479                rc.setOffset(dataIn.readInt());
1480                return rc;
1481            }
1482    
1483            public void writePayload(Location object, DataOutput dataOut) throws IOException {
1484                dataOut.writeInt(object.getDataFileId());
1485                dataOut.writeInt(object.getOffset());
1486            }
1487    
1488            public int getFixedSize() {
1489                return 8;
1490            }
1491    
1492            public Location deepCopy(Location source) {
1493                return new Location(source);
1494            }
1495    
1496            public boolean isDeepCopySupported() {
1497                return true;
1498            }
1499        }
1500    
1501        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
1502            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
1503    
1504            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
1505                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
1506                rc.mergeFramed((InputStream)dataIn);
1507                return rc;
1508            }
1509    
1510            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
1511                object.writeFramed((OutputStream)dataOut);
1512            }
1513        }
1514    
1515        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1516            String key = key(destination);
1517            StoredDestination rc = storedDestinations.get(key);
1518            if (rc == null) {
1519                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
1520                rc = loadStoredDestination(tx, key, topic);
1521                // Cache it. We may want to remove/unload destinations from the
1522                // cache that are not used for a while
1523                // to reduce memory usage.
1524                storedDestinations.put(key, rc);
1525            }
1526            return rc;
1527        }
1528    
1529    
1530        protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
1531            String key = key(destination);
1532            StoredDestination rc = storedDestinations.get(key);
1533            if (rc == null && metadata.destinations.containsKey(tx, key)) {
1534                rc = getStoredDestination(destination, tx);
1535            }
1536            return rc;
1537        }
1538    
1539        /**
1540         * @param tx
1541         * @param key
1542         * @param topic
1543         * @return
1544         * @throws IOException
1545         */
1546        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
1547            // Try to load the existing indexes..
1548            StoredDestination rc = metadata.destinations.get(tx, key);
1549            if (rc == null) {
1550                // Brand new destination.. allocate indexes for it.
1551                rc = new StoredDestination();
1552                rc.orderIndex.allocate(tx);
1553                rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
1554                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
1555    
1556                if (topic) {
1557                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
1558                    rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
1559                    rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
1560                }
1561                metadata.destinations.put(tx, key, rc);
1562            }
1563    
1564            // Configure the marshalers and load.
1565            rc.orderIndex.load(tx);
1566    
1567            // Figure out the next key using the last entry in the destination.
1568            rc.orderIndex.configureLast(tx);
1569    
1570            rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
1571            rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1572            rc.locationIndex.load(tx);
1573    
1574            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
1575            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
1576            rc.messageIdIndex.load(tx);
1577            
1578            // If it was a topic...
1579            if (topic) {
1580    
1581                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
1582                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
1583                rc.subscriptions.load(tx);
1584    
1585                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
1586                rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
1587                rc.subscriptionAcks.load(tx);
1588    
1589                rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
1590                rc.ackPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
1591                rc.ackPositions.load(tx);
1592    
1593                rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
1594    
1595                if (metadata.version < 3) {
1596    
1597                    // on upgrade need to fill ackLocation with available messages past last ack
1598                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
1599                        Entry<String, LastAck> entry = iterator.next();
1600                        for (Iterator<Entry<Long, MessageKeys>> orderIterator =
1601                                rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
1602                            Long sequence = orderIterator.next().getKey();
1603                            addAckLocation(tx, rc, sequence, entry.getKey());
1604                        }
1605                        // modify so it is upgraded                   
1606                        rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
1607                    }
1608                }
1609                
1610                if (rc.orderIndex.nextMessageId == 0) {
1611                    // check for existing durable sub all acked out - pull next seq from acks as messages are gone
1612                    if (!rc.subscriptionAcks.isEmpty(tx)) {
1613                        for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1614                            Entry<String, LastAck> entry = iterator.next();
1615                            rc.orderIndex.nextMessageId =
1616                                    Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
1617                        }
1618                    }
1619                } else {
1620                    // update based on ackPositions for unmatched, last entry is always the next
1621                    if (!rc.ackPositions.isEmpty(tx)) {
1622                        Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
1623                        rc.orderIndex.nextMessageId =
1624                            Math.max(rc.orderIndex.nextMessageId, last.getKey());
1625                    }
1626                }
1627    
1628            }
1629    
1630            if (metadata.version < 3) {
1631                // store again after upgrade
1632                metadata.destinations.put(tx, key, rc);
1633            }        
1634            return rc;
1635        }
1636    
1637        private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1638            HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
1639            if (hs == null) {
1640                hs = new HashSet<String>();
1641            }
1642            hs.add(subscriptionKey);
1643            // every ack location addition needs to be a btree modification to get it stored
1644            sd.ackPositions.put(tx, messageSequence, hs);
1645        }
1646    
1647        // new sub is interested in potentially all existing messages
1648        private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
1649            for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) {
1650                Entry<Long, HashSet<String>> entry = iterator.next();
1651                entry.getValue().add(subscriptionKey);
1652                sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
1653            }
1654        }
1655    
1656        final HashSet nextMessageIdMarker = new HashSet<String>();
1657        // on a new message add, all existing subs are interested in this message
1658        private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
1659            HashSet hs = new HashSet<String>();
1660            for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
1661                Entry<String, LastAck> entry = iterator.next();
1662                hs.add(entry.getKey());
1663            }
1664            sd.ackPositions.put(tx, messageSequence, hs);
1665            // add empty next to keep track of nextMessage
1666            sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
1667        }
1668    
1669        private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
1670            if (!sd.ackPositions.isEmpty(tx)) {
1671                Long end = sd.ackPositions.getLast(tx).getKey();
1672                for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) {
1673                    removeAckLocation(tx, sd, subscriptionKey, sequence);
1674                }
1675            }
1676        }
1677    
1678        /**
1679         * @param tx
1680         * @param sd
1681         * @param subscriptionKey
1682         * @param sequenceId
1683         * @throws IOException
1684         */
1685        private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
1686            // Remove the sub from the previous location set..
1687            if (sequenceId != null) {
1688                HashSet<String> hs = sd.ackPositions.get(tx, sequenceId);
1689                if (hs != null) {
1690                    hs.remove(subscriptionKey);
1691                    if (hs.isEmpty()) {
1692                        HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
1693                        sd.ackPositions.remove(tx, sequenceId);
1694    
1695                        // Find all the entries that need to get deleted.
1696                        ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
1697                        sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
1698    
1699                        // Do the actual deletes.
1700                        for (Entry<Long, MessageKeys> entry : deletes) {
1701                            sd.locationIndex.remove(tx, entry.getValue().location);
1702                            sd.messageIdIndex.remove(tx, entry.getValue().messageId);
1703                            sd.orderIndex.remove(tx, entry.getKey());
1704                        }
1705                    } else {
1706                        // update
1707                        sd.ackPositions.put(tx, sequenceId, hs);
1708                    }
1709                }
1710            }
1711        }
1712    
1713        private String key(KahaDestination destination) {
1714            return destination.getType().getNumber() + ":" + destination.getName();
1715        }
1716    
1717        // /////////////////////////////////////////////////////////////////
1718        // Transaction related implementation methods.
1719        // /////////////////////////////////////////////////////////////////
1720        protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
1721        protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
1722     
1723        private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
1724            TransactionId key = key(info);
1725            List<Operation> tx;
1726            synchronized (inflightTransactions) {
1727                tx = inflightTransactions.get(key);
1728                if (tx == null) {
1729                    tx = Collections.synchronizedList(new ArrayList<Operation>());
1730                    inflightTransactions.put(key, tx);
1731                }
1732            }
1733            return tx;
1734        }
1735    
1736        private TransactionId key(KahaTransactionInfo transactionInfo) {
1737            if (transactionInfo.hasLocalTransacitonId()) {
1738                KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
1739                LocalTransactionId rc = new LocalTransactionId();
1740                rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
1741                rc.setValue(tx.getTransacitonId());
1742                return rc;
1743            } else {
1744                KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
1745                XATransactionId rc = new XATransactionId();
1746                rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
1747                rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
1748                rc.setFormatId(tx.getFormatId());
1749                return rc;
1750            }
1751        }
1752    
1753        abstract class Operation {
1754            final Location location;
1755    
1756            public Operation(Location location) {
1757                this.location = location;
1758            }
1759    
1760            public Location getLocation() {
1761                return location;
1762            }
1763    
1764            abstract public void execute(Transaction tx) throws IOException;
1765        }
1766    
1767        class AddOpperation extends Operation {
1768            final KahaAddMessageCommand command;
1769    
1770            public AddOpperation(KahaAddMessageCommand command, Location location) {
1771                super(location);
1772                this.command = command;
1773            }
1774    
1775            @Override
1776            public void execute(Transaction tx) throws IOException {
1777                upadateIndex(tx, command, location);
1778            }
1779    
1780            public KahaAddMessageCommand getCommand() {
1781                return command;
1782            }
1783        }
1784    
1785        class RemoveOpperation extends Operation {
1786            final KahaRemoveMessageCommand command;
1787    
1788            public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
1789                super(location);
1790                this.command = command;
1791            }
1792    
1793            @Override
1794            public void execute(Transaction tx) throws IOException {
1795                updateIndex(tx, command, location);
1796            }
1797    
1798            public KahaRemoveMessageCommand getCommand() {
1799                return command;
1800            }
1801        }
1802    
1803        // /////////////////////////////////////////////////////////////////
1804        // Initialization related implementation methods.
1805        // /////////////////////////////////////////////////////////////////
1806    
1807        private PageFile createPageFile() {
1808            PageFile index = new PageFile(directory, "db");
1809            index.setEnableWriteThread(isEnableIndexWriteAsync());
1810            index.setWriteBatchSize(getIndexWriteBatchSize());
1811            index.setPageCacheSize(indexCacheSize);
1812            return index;
1813        }
1814    
1815        private Journal createJournal() throws IOException {
1816            Journal manager = new Journal();
1817            manager.setDirectory(directory);
1818            manager.setMaxFileLength(getJournalMaxFileLength());
1819            manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
1820            manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
1821            manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
1822            manager.setArchiveDataLogs(isArchiveDataLogs());
1823            manager.setSizeAccumulator(storeSize);
1824            if (getDirectoryArchive() != null) {
1825                IOHelper.mkdirs(getDirectoryArchive());
1826                manager.setDirectoryArchive(getDirectoryArchive());
1827            }
1828            return manager;
1829        }
1830    
1831        public int getJournalMaxWriteBatchSize() {
1832            return journalMaxWriteBatchSize;
1833        }
1834        
1835        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
1836            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
1837        }
1838    
1839        public File getDirectory() {
1840            return directory;
1841        }
1842    
1843        public void setDirectory(File directory) {
1844            this.directory = directory;
1845        }
1846    
1847        public boolean isDeleteAllMessages() {
1848            return deleteAllMessages;
1849        }
1850    
1851        public void setDeleteAllMessages(boolean deleteAllMessages) {
1852            this.deleteAllMessages = deleteAllMessages;
1853        }
1854        
1855        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
1856            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
1857        }
1858    
1859        public int getIndexWriteBatchSize() {
1860            return setIndexWriteBatchSize;
1861        }
1862        
1863        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
1864            this.enableIndexWriteAsync = enableIndexWriteAsync;
1865        }
1866        
1867        boolean isEnableIndexWriteAsync() {
1868            return enableIndexWriteAsync;
1869        }
1870        
1871        public boolean isEnableJournalDiskSyncs() {
1872            return enableJournalDiskSyncs;
1873        }
1874    
1875        public void setEnableJournalDiskSyncs(boolean syncWrites) {
1876            this.enableJournalDiskSyncs = syncWrites;
1877        }
1878    
1879        public long getCheckpointInterval() {
1880            return checkpointInterval;
1881        }
1882    
1883        public void setCheckpointInterval(long checkpointInterval) {
1884            this.checkpointInterval = checkpointInterval;
1885        }
1886    
1887        public long getCleanupInterval() {
1888            return cleanupInterval;
1889        }
1890    
1891        public void setCleanupInterval(long cleanupInterval) {
1892            this.cleanupInterval = cleanupInterval;
1893        }
1894    
1895        public void setJournalMaxFileLength(int journalMaxFileLength) {
1896            this.journalMaxFileLength = journalMaxFileLength;
1897        }
1898        
1899        public int getJournalMaxFileLength() {
1900            return journalMaxFileLength;
1901        }
1902        
1903        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
1904            this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
1905        }
1906        
1907        public int getMaxFailoverProducersToTrack() {
1908            return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
1909        }
1910        
1911        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
1912            this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
1913        }
1914        
1915        public int getFailoverProducersAuditDepth() {
1916            return this.metadata.producerSequenceIdTracker.getAuditDepth();
1917        }
1918        
1919        public PageFile getPageFile() {
1920            if (pageFile == null) {
1921                pageFile = createPageFile();
1922            }
1923                    return pageFile;
1924            }
1925    
1926            public Journal getJournal() throws IOException {
1927            if (journal == null) {
1928                journal = createJournal();
1929            }
1930                    return journal;
1931            }
1932    
1933        public boolean isFailIfDatabaseIsLocked() {
1934            return failIfDatabaseIsLocked;
1935        }
1936    
1937        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
1938            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
1939        }
1940    
1941        public boolean isIgnoreMissingJournalfiles() {
1942            return ignoreMissingJournalfiles;
1943        }
1944        
1945        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
1946            this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
1947        }
1948    
1949        public int getIndexCacheSize() {
1950            return indexCacheSize;
1951        }
1952    
1953        public void setIndexCacheSize(int indexCacheSize) {
1954            this.indexCacheSize = indexCacheSize;
1955        }
1956    
1957        public boolean isCheckForCorruptJournalFiles() {
1958            return checkForCorruptJournalFiles;
1959        }
1960    
1961        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
1962            this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
1963        }
1964    
1965        public boolean isChecksumJournalFiles() {
1966            return checksumJournalFiles;
1967        }
1968    
1969        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
1970            this.checksumJournalFiles = checksumJournalFiles;
1971        }
1972    
1973            public void setBrokerService(BrokerService brokerService) {
1974                    this.brokerService = brokerService;
1975            }
1976    
1977        /**
1978         * @return the archiveDataLogs
1979         */
1980        public boolean isArchiveDataLogs() {
1981            return this.archiveDataLogs;
1982        }
1983    
1984        /**
1985         * @param archiveDataLogs the archiveDataLogs to set
1986         */
1987        public void setArchiveDataLogs(boolean archiveDataLogs) {
1988            this.archiveDataLogs = archiveDataLogs;
1989        }
1990    
1991        /**
1992         * @return the directoryArchive
1993         */
1994        public File getDirectoryArchive() {
1995            return this.directoryArchive;
1996        }
1997    
1998        /**
1999         * @param directoryArchive the directoryArchive to set
2000         */
2001        public void setDirectoryArchive(File directoryArchive) {
2002            this.directoryArchive = directoryArchive;
2003        }
2004    
2005        /**
2006         * @return the databaseLockedWaitDelay
2007         */
2008        public int getDatabaseLockedWaitDelay() {
2009            return this.databaseLockedWaitDelay;
2010        }
2011    
2012        /**
2013         * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
2014         */
2015        public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
2016            this.databaseLockedWaitDelay = databaseLockedWaitDelay;
2017        }
2018    
2019        // /////////////////////////////////////////////////////////////////
2020        // Internal conversion methods.
2021        // /////////////////////////////////////////////////////////////////
2022    
2023        KahaTransactionInfo createTransactionInfo(TransactionId txid) {
2024            if (txid == null) {
2025                return null;
2026            }
2027            KahaTransactionInfo rc = new KahaTransactionInfo();
2028    
2029            if (txid.isLocalTransaction()) {
2030                LocalTransactionId t = (LocalTransactionId) txid;
2031                KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
2032                kahaTxId.setConnectionId(t.getConnectionId().getValue());
2033                kahaTxId.setTransacitonId(t.getValue());
2034                rc.setLocalTransacitonId(kahaTxId);
2035            } else {
2036                XATransactionId t = (XATransactionId) txid;
2037                KahaXATransactionId kahaTxId = new KahaXATransactionId();
2038                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
2039                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
2040                kahaTxId.setFormatId(t.getFormatId());
2041                rc.setXaTransacitonId(kahaTxId);
2042            }
2043            return rc;
2044        }
2045    
2046        class MessageOrderCursor{
2047            long defaultCursorPosition;
2048            long lowPriorityCursorPosition;
2049            long highPriorityCursorPosition;
2050            MessageOrderCursor(){
2051            }
2052            
2053            MessageOrderCursor(long position){
2054                this.defaultCursorPosition=position;
2055                this.lowPriorityCursorPosition=position;
2056                this.highPriorityCursorPosition=position;
2057            }
2058            
2059            MessageOrderCursor(MessageOrderCursor other){
2060                this.defaultCursorPosition=other.defaultCursorPosition;
2061                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2062                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2063            }
2064            
2065            MessageOrderCursor copy() {
2066                return new MessageOrderCursor(this);
2067            }
2068            
2069            void reset() {
2070                this.defaultCursorPosition=0;
2071                this.highPriorityCursorPosition=0;
2072                this.lowPriorityCursorPosition=0;
2073            }
2074            
2075            void increment() {
2076                if (defaultCursorPosition!=0) {
2077                    defaultCursorPosition++;
2078                }
2079                if (highPriorityCursorPosition!=0) {
2080                    highPriorityCursorPosition++;
2081                }
2082                if (lowPriorityCursorPosition!=0) {
2083                    lowPriorityCursorPosition++;
2084                }
2085            }
2086    
2087            public String toString() {
2088               return "MessageOrderCursor:[def:" + defaultCursorPosition
2089                       + ", low:" + lowPriorityCursorPosition
2090                       + ", high:" +  highPriorityCursorPosition + "]";
2091            }
2092    
2093            public void sync(MessageOrderCursor other) {
2094                this.defaultCursorPosition=other.defaultCursorPosition;
2095                this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
2096                this.highPriorityCursorPosition=other.highPriorityCursorPosition;
2097            }
2098        }
2099        
2100        class MessageOrderIndex {
2101            static final byte HI = 9;
2102            static final byte LO = 0;
2103            static final byte DEF = 4;
2104    
2105            long nextMessageId;
2106            BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
2107            BTreeIndex<Long, MessageKeys> lowPriorityIndex;
2108            BTreeIndex<Long, MessageKeys> highPriorityIndex;
2109            MessageOrderCursor cursor = new MessageOrderCursor();
2110            Long lastDefaultKey;
2111            Long lastHighKey;
2112            Long lastLowKey;
2113            byte lastGetPriority;
2114    
2115            MessageKeys remove(Transaction tx, Long key) throws IOException {
2116                MessageKeys result = defaultPriorityIndex.remove(tx, key);
2117                if (result == null && highPriorityIndex!=null) {
2118                    result = highPriorityIndex.remove(tx, key);
2119                    if (result ==null && lowPriorityIndex!=null) {
2120                        result = lowPriorityIndex.remove(tx, key);
2121                    }
2122                }
2123                return result;
2124            }
2125            
2126            void load(Transaction tx) throws IOException {
2127                defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2128                defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2129                defaultPriorityIndex.load(tx);
2130                lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2131                lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2132                lowPriorityIndex.load(tx);
2133                highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2134                highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2135                highPriorityIndex.load(tx);
2136            }
2137            
2138            void allocate(Transaction tx) throws IOException {
2139                defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2140                if (metadata.version >= 2) {
2141                    lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2142                    highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2143                }
2144            }
2145            
2146            void configureLast(Transaction tx) throws IOException {
2147                // Figure out the next key using the last entry in the destination.
2148                if (highPriorityIndex != null) {
2149                    Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
2150                    if (lastEntry != null) {
2151                        nextMessageId = lastEntry.getKey() + 1;
2152                    } else {
2153                        lastEntry = defaultPriorityIndex.getLast(tx);
2154                        if (lastEntry != null) {
2155                            nextMessageId = lastEntry.getKey() + 1;
2156                        } else {
2157                            lastEntry = lowPriorityIndex.getLast(tx);
2158                            if (lastEntry != null) {
2159                                nextMessageId = lastEntry.getKey() + 1;
2160                            }
2161                        }
2162                    }
2163                } else {
2164                    Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
2165                    if (lastEntry != null) {
2166                        nextMessageId = lastEntry.getKey() + 1;
2167                    }
2168                }
2169            }
2170            
2171                   
2172            void remove(Transaction tx) throws IOException {
2173                defaultPriorityIndex.clear(tx);
2174                defaultPriorityIndex.unload(tx);
2175                tx.free(defaultPriorityIndex.getPageId());
2176                if (lowPriorityIndex != null) {
2177                    lowPriorityIndex.clear(tx);
2178                    lowPriorityIndex.unload(tx);
2179    
2180                    tx.free(lowPriorityIndex.getPageId());
2181                }
2182                if (highPriorityIndex != null) {
2183                    highPriorityIndex.clear(tx);
2184                    highPriorityIndex.unload(tx);
2185                    tx.free(highPriorityIndex.getPageId());
2186                }
2187            }
2188            
2189            void resetCursorPosition() {
2190                this.cursor.reset();
2191                lastDefaultKey = null;
2192                lastHighKey = null;
2193                lastLowKey = null;
2194            }
2195            
2196            void setBatch(Transaction tx, Long sequence) throws IOException {
2197                if (sequence != null) {
2198                    Long nextPosition = new Long(sequence.longValue() + 1);
2199                    if (defaultPriorityIndex.containsKey(tx, sequence)) {
2200                        lastDefaultKey = sequence;
2201                        cursor.defaultCursorPosition = nextPosition.longValue();
2202                    } else if (highPriorityIndex != null) {
2203                        if (highPriorityIndex.containsKey(tx, sequence)) {
2204                            lastHighKey = sequence;
2205                            cursor.highPriorityCursorPosition = nextPosition.longValue();
2206                        } else if (lowPriorityIndex.containsKey(tx, sequence)) {
2207                            lastLowKey = sequence;
2208                            cursor.lowPriorityCursorPosition = nextPosition.longValue();
2209                        }
2210                    } else {
2211                        lastDefaultKey = sequence;
2212                        cursor.defaultCursorPosition = nextPosition.longValue();
2213                    }
2214                }
2215            }
2216    
2217            void setBatch(Transaction tx, LastAck last) throws IOException {
2218                setBatch(tx, last.lastAckedSequence);
2219                if (cursor.defaultCursorPosition == 0
2220                        && cursor.highPriorityCursorPosition == 0
2221                        && cursor.lowPriorityCursorPosition == 0) {
2222                    long next = last.lastAckedSequence + 1;
2223                    switch (last.priority) {
2224                        case DEF:
2225                            cursor.defaultCursorPosition = next;
2226                            cursor.highPriorityCursorPosition = next;
2227                            break;
2228                        case HI:
2229                            cursor.highPriorityCursorPosition = next;
2230                            break;
2231                        case LO:
2232                            cursor.lowPriorityCursorPosition = next;
2233                            cursor.defaultCursorPosition = next;
2234                            cursor.highPriorityCursorPosition = next;
2235                            break;
2236                    }
2237                }
2238            }
2239            
2240            void stoppedIterating() {
2241                if (lastDefaultKey!=null) {
2242                    cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
2243                }
2244                if (lastHighKey!=null) {
2245                    cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
2246                }
2247                if (lastLowKey!=null) {
2248                    cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
2249                }
2250                lastDefaultKey = null;
2251                lastHighKey = null;
2252                lastLowKey = null;
2253            }
2254            
2255            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
2256                    throws IOException {
2257                if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
2258                    getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
2259                } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
2260                    getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
2261                } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
2262                    getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
2263                }
2264            }
2265            
2266            void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
2267                    BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
2268    
2269                Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId);
2270                deletes.add(iterator.next());
2271            }
2272            
2273            long getNextMessageId(int priority) {
2274                return nextMessageId++;
2275            }
2276            
2277            MessageKeys get(Transaction tx, Long key) throws IOException {
2278                MessageKeys result = defaultPriorityIndex.get(tx, key);
2279                if (result == null) {
2280                    result = highPriorityIndex.get(tx, key);
2281                    if (result == null) {
2282                        result = lowPriorityIndex.get(tx, key);
2283                        lastGetPriority = LO;
2284                    } else {
2285                        lastGetPriority = HI;
2286                    }
2287                } else {
2288                    lastGetPriority = DEF;
2289                }
2290                return result;
2291            }
2292            
2293            MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
2294                if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
2295                    return defaultPriorityIndex.put(tx, key, value);
2296                } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
2297                    return highPriorityIndex.put(tx, key, value);
2298                } else {
2299                    return lowPriorityIndex.put(tx, key, value);
2300                }
2301            }
2302            
2303            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
2304                return new MessageOrderIterator(tx,cursor);
2305            }
2306            
2307            Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
2308                return new MessageOrderIterator(tx,m);
2309            }
2310    
2311            public byte lastGetPriority() {
2312                return lastGetPriority;
2313            }
2314    
2315            class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
2316                Iterator<Entry<Long, MessageKeys>>currentIterator;
2317                final Iterator<Entry<Long, MessageKeys>>highIterator;
2318                final Iterator<Entry<Long, MessageKeys>>defaultIterator;
2319                final Iterator<Entry<Long, MessageKeys>>lowIterator;
2320                
2321                
2322    
2323                MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
2324                    this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
2325                    if (highPriorityIndex != null) {
2326                        this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
2327                    } else {
2328                        this.highIterator = null;
2329                    }
2330                    if (lowPriorityIndex != null) {
2331                        this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
2332                    } else {
2333                        this.lowIterator = null;
2334                    }
2335                }
2336                
2337                public boolean hasNext() {
2338                    if (currentIterator == null) {
2339                        if (highIterator != null) {
2340                            if (highIterator.hasNext()) {
2341                                currentIterator = highIterator;
2342                                return currentIterator.hasNext();
2343                            }
2344                            if (defaultIterator.hasNext()) {
2345                                currentIterator = defaultIterator;
2346                                return currentIterator.hasNext();
2347                            }
2348                            if (lowIterator.hasNext()) {
2349                                currentIterator = lowIterator;
2350                                return currentIterator.hasNext();
2351                            }
2352                            return false;
2353                        } else {
2354                            currentIterator = defaultIterator;
2355                            return currentIterator.hasNext();
2356                        }
2357                    }
2358                    if (highIterator != null) {
2359                        if (currentIterator.hasNext()) {
2360                            return true;
2361                        }
2362                        if (currentIterator == highIterator) {
2363                            if (defaultIterator.hasNext()) {
2364                                currentIterator = defaultIterator;
2365                                return currentIterator.hasNext();
2366                            }
2367                            if (lowIterator.hasNext()) {
2368                                currentIterator = lowIterator;
2369                                return currentIterator.hasNext();
2370                            }
2371                            return false;
2372                        }
2373                        if (currentIterator == defaultIterator) {
2374                            if (lowIterator.hasNext()) {
2375                                currentIterator = lowIterator;
2376                                return currentIterator.hasNext();
2377                            }
2378                            return false;
2379                        }
2380                    }
2381                    return currentIterator.hasNext();
2382                }
2383    
2384                public Entry<Long, MessageKeys> next() {
2385                    Entry<Long, MessageKeys> result = currentIterator.next();
2386                    if (result != null) {
2387                        Long key = result.getKey();
2388                        if (highIterator != null) {
2389                            if (currentIterator == defaultIterator) {
2390                                lastDefaultKey = key;
2391                            } else if (currentIterator == highIterator) {
2392                                lastHighKey = key;
2393                            } else {
2394                                lastLowKey = key;
2395                            }
2396                        } else {
2397                            lastDefaultKey = key;
2398                        }
2399                    }
2400                    return result;
2401                }
2402    
2403                public void remove() {
2404                    throw new UnsupportedOperationException();
2405                }
2406               
2407            }
2408        }
2409        
2410        private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
2411            final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
2412    
2413            public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
2414                ByteArrayOutputStream baos = new ByteArrayOutputStream();
2415                ObjectOutputStream oout = new ObjectOutputStream(baos);
2416                oout.writeObject(object);
2417                oout.flush();
2418                oout.close();
2419                byte[] data = baos.toByteArray();
2420                dataOut.writeInt(data.length);
2421                dataOut.write(data);
2422            }
2423    
2424            public HashSet<String> readPayload(DataInput dataIn) throws IOException {
2425                int dataLen = dataIn.readInt();
2426                byte[] data = new byte[dataLen];
2427                dataIn.readFully(data);
2428                ByteArrayInputStream bais = new ByteArrayInputStream(data);
2429                ObjectInputStream oin = new ObjectInputStream(bais);
2430                try {
2431                    return (HashSet<String>) oin.readObject();
2432                } catch (ClassNotFoundException cfe) {
2433                        IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
2434                        ioe.initCause(cfe);
2435                        throw ioe;
2436                    }
2437            }
2438        }
2439    }