001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.journal;
018    
019    import java.io.File;
020    import java.io.FilenameFilter;
021    import java.io.IOException;
022    import java.io.UnsupportedEncodingException;
023    import java.util.ArrayList;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.LinkedHashMap;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    import java.util.Timer;
032    import java.util.TimerTask;
033    import java.util.TreeMap;
034    import java.util.concurrent.ConcurrentHashMap;
035    import java.util.concurrent.atomic.AtomicLong;
036    import java.util.concurrent.atomic.AtomicReference;
037    import java.util.zip.Adler32;
038    import java.util.zip.Checksum;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
042    import org.apache.kahadb.journal.DataFileAppender.WriteKey;
043    import org.apache.kahadb.util.ByteSequence;
044    import org.apache.kahadb.util.DataByteArrayInputStream;
045    import org.apache.kahadb.util.DataByteArrayOutputStream;
046    import org.apache.kahadb.util.LinkedNodeList;
047    import org.apache.kahadb.util.SchedulerTimerTask;
048    import org.apache.kahadb.util.Sequence;
049    
050    /**
051     * Manages DataFiles
052     * 
053     * 
054     */
055    public class Journal {
056    
057        private static final int MAX_BATCH_SIZE = 32*1024*1024;
058    
059            // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
060        public static final int RECORD_HEAD_SPACE = 4 + 1;
061        
062        public static final byte USER_RECORD_TYPE = 1;
063        public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
064        // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 
065        public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
066        public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
067        public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
068    
069        private static byte[] createBatchControlRecordHeader() {
070            try {
071                DataByteArrayOutputStream os = new DataByteArrayOutputStream();
072                os.writeInt(BATCH_CONTROL_RECORD_SIZE);
073                os.writeByte(BATCH_CONTROL_RECORD_TYPE);
074                os.write(BATCH_CONTROL_RECORD_MAGIC);
075                ByteSequence sequence = os.toByteSequence();
076                sequence.compact();
077                return sequence.getData();
078            } catch (IOException e) {
079                throw new RuntimeException("Could not create batch control record header.");
080            }
081        }
082    
083        public static final String DEFAULT_DIRECTORY = ".";
084        public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
085        public static final String DEFAULT_FILE_PREFIX = "db-";
086        public static final String DEFAULT_FILE_SUFFIX = ".log";
087        public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
088        public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
089        public static final int PREFERED_DIFF = 1024 * 512;
090        public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
091        
092        private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
093    
094        protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
095    
096        protected File directory = new File(DEFAULT_DIRECTORY);
097        protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
098        protected String filePrefix = DEFAULT_FILE_PREFIX;
099        protected String fileSuffix = DEFAULT_FILE_SUFFIX;
100        protected boolean started;
101        
102        protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
103        protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
104        protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
105        
106        protected DataFileAppender appender;
107        protected DataFileAccessorPool accessorPool;
108    
109        protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
110        protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
111        protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
112    
113        protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
114        protected Runnable cleanupTask;
115        protected AtomicLong totalLength = new AtomicLong();
116        protected boolean archiveDataLogs;
117            private ReplicationTarget replicationTarget;
118        protected boolean checksum;
119        protected boolean checkForCorruptionOnStartup;
120        private Timer timer;
121       
122    
123        public synchronized void start() throws IOException {
124            if (started) {
125                return;
126            }
127            
128            long start = System.currentTimeMillis();
129            accessorPool = new DataFileAccessorPool(this);
130            started = true;
131            preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
132    
133            appender = new DataFileAppender(this);
134    
135            File[] files = directory.listFiles(new FilenameFilter() {
136                public boolean accept(File dir, String n) {
137                    return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
138                }
139            });
140    
141            if (files != null) {
142                for (int i = 0; i < files.length; i++) {
143                    try {
144                        File file = files[i];
145                        String n = file.getName();
146                        String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
147                        int num = Integer.parseInt(numStr);
148                        DataFile dataFile = new DataFile(file, num, preferedFileLength);
149                        fileMap.put(dataFile.getDataFileId(), dataFile);
150                        totalLength.addAndGet(dataFile.getLength());
151                    } catch (NumberFormatException e) {
152                        // Ignore file that do not match the pattern.
153                    }
154                }
155    
156                // Sort the list so that we can link the DataFiles together in the
157                // right order.
158                List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
159                Collections.sort(l);
160                for (DataFile df : l) {
161                    if (df.getLength() == 0) {
162                        // possibly the result of a previous failed write
163                        LOG.info("ignoring zero length, partially initialised journal data file: " + df);
164                        continue;
165                    }
166                    dataFiles.addLast(df);
167                    fileByFileMap.put(df.getFile(), df);
168    
169                    if( isCheckForCorruptionOnStartup() ) {
170                        lastAppendLocation.set(recoveryCheck(df));
171                    }
172                }
173            }
174    
175            getCurrentWriteFile();
176    
177            if( lastAppendLocation.get()==null ) {
178                DataFile df = dataFiles.getTail();
179                lastAppendLocation.set(recoveryCheck(df));
180            }
181    
182            cleanupTask = new Runnable() {
183                public void run() {
184                    cleanup();
185                }
186            };
187            this.timer = new Timer("KahaDB Scheduler", true);
188            TimerTask task = new SchedulerTimerTask(cleanupTask);
189            this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
190            long end = System.currentTimeMillis();
191            LOG.trace("Startup took: "+(end-start)+" ms");
192        }
193    
194        private static byte[] bytes(String string) {
195            try {
196                            return string.getBytes("UTF-8");
197                    } catch (UnsupportedEncodingException e) {
198                            throw new RuntimeException(e);
199                    }
200            }
201    
202            protected Location recoveryCheck(DataFile dataFile) throws IOException {
203            Location location = new Location();
204            location.setDataFileId(dataFile.getDataFileId());
205            location.setOffset(0);
206    
207            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
208            try {
209                while( true ) {
210                    int size = checkBatchRecord(reader, location.getOffset());
211                    if ( size>=0 ) {
212                        location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
213                    } else {
214    
215                        // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
216                        // may have subsequent valid batch records.
217                        int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
218                        if( nextOffset >=0 ) {
219                            Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
220                            LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
221                            dataFile.corruptedBlocks.add(sequence);
222                            location.setOffset(nextOffset);
223                        } else {
224                            break;
225                        }
226                    }
227                }
228                
229            } catch (IOException e) {
230                    } finally {
231                accessorPool.closeDataFileAccessor(reader);
232            }
233    
234            int existingLen = dataFile.getLength();
235            dataFile.setLength(location.getOffset());
236            if (existingLen > dataFile.getLength()) {
237                totalLength.addAndGet(dataFile.getLength() - existingLen);
238            }
239    
240            if( !dataFile.corruptedBlocks.isEmpty() ) {
241                // Is the end of the data file corrupted?
242                if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
243                    dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
244                }
245            }
246    
247            return location;
248        }
249    
250        private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
251            ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
252            byte data[] = new byte[1024*4];
253            ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
254    
255            int pos = 0;
256            while( true ) {
257                pos = bs.indexOf(header, pos);
258                if( pos >= 0 ) {
259                    return offset+pos;
260                } else {
261                    // need to load the next data chunck in..
262                    if( bs.length != data.length ) {
263                        // If we had a short read then we were at EOF
264                        return -1;
265                    }
266                    offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
267                    bs = new ByteSequence(data, 0, reader.read(offset, data));
268                    pos=0;
269                }
270            }
271        }
272    
273    
274        public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
275            byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
276            DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
277    
278            reader.readFully(offset, controlRecord);
279    
280            // Assert that it's  a batch record.
281            for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
282                if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
283                    return -1;
284                }
285            }
286    
287            int size = controlIs.readInt();
288            if( size > MAX_BATCH_SIZE ) {
289                return -1;
290            }
291    
292            if( isChecksum() ) {
293    
294                long expectedChecksum = controlIs.readLong();
295                if( expectedChecksum == 0 ) {
296                    // Checksuming was not enabled when the record was stored.
297                    // we can't validate the record :(
298                    return size;
299                }
300    
301                byte data[] = new byte[size];
302                reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
303    
304                Checksum checksum = new Adler32();
305                checksum.update(data, 0, data.length);
306    
307                if( expectedChecksum!=checksum.getValue() ) {
308                    return -1;
309                }
310    
311            }
312            return size;
313        }
314    
315    
316            void addToTotalLength(int size) {
317                    totalLength.addAndGet(size);
318            }
319        
320        
321        synchronized DataFile getCurrentWriteFile() throws IOException {
322            if (dataFiles.isEmpty()) {
323                rotateWriteFile();
324            }
325            return dataFiles.getTail();
326        }
327    
328        synchronized DataFile rotateWriteFile() {
329                    int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
330                    File file = getFile(nextNum);
331                    DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
332                    // actually allocate the disk space
333                    fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
334                    fileByFileMap.put(file, nextWriteFile);
335                    dataFiles.addLast(nextWriteFile);
336                    return nextWriteFile;
337            }
338    
339            public File getFile(int nextNum) {
340                    String fileName = filePrefix + nextNum + fileSuffix;
341                    File file = new File(directory, fileName);
342                    return file;
343            }
344    
345        synchronized DataFile getDataFile(Location item) throws IOException {
346            Integer key = Integer.valueOf(item.getDataFileId());
347            DataFile dataFile = fileMap.get(key);
348            if (dataFile == null) {
349                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
350                throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
351            }
352            return dataFile;
353        }
354    
355        synchronized File getFile(Location item) throws IOException {
356            Integer key = Integer.valueOf(item.getDataFileId());
357            DataFile dataFile = fileMap.get(key);
358            if (dataFile == null) {
359                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
360                throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
361            }
362            return dataFile.getFile();
363        }
364    
365        private DataFile getNextDataFile(DataFile dataFile) {
366            return dataFile.getNext();
367        }
368    
369        public synchronized void close() throws IOException {
370            if (!started) {
371                return;
372            }
373            if (this.timer != null) {
374                this.timer.cancel();
375            }
376            accessorPool.close();
377            appender.close();
378            fileMap.clear();
379            fileByFileMap.clear();
380            dataFiles.clear();
381            lastAppendLocation.set(null);
382            started = false;
383        }
384    
385        synchronized void cleanup() {
386            if (accessorPool != null) {
387                accessorPool.disposeUnused();
388            }
389        }
390    
391        public synchronized boolean delete() throws IOException {
392    
393            // Close all open file handles...
394            appender.close();
395            accessorPool.close();
396    
397            boolean result = true;
398            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
399                DataFile dataFile = i.next();
400                totalLength.addAndGet(-dataFile.getLength());
401                result &= dataFile.delete();
402            }
403            fileMap.clear();
404            fileByFileMap.clear();
405            lastAppendLocation.set(null);
406            dataFiles = new LinkedNodeList<DataFile>();
407    
408            // reopen open file handles...
409            accessorPool = new DataFileAccessorPool(this);
410            appender = new DataFileAppender(this);
411            return result;
412        }
413    
414        public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
415            for (Integer key : files) {
416                // Can't remove the data file (or subsequent files) that is currently being written to.
417                    if( key >= lastAppendLocation.get().getDataFileId() ) {
418                            continue;
419                    }
420                DataFile dataFile = fileMap.get(key);
421                if( dataFile!=null ) {
422                    forceRemoveDataFile(dataFile);
423                }
424            }
425        }
426    
427        private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
428            accessorPool.disposeDataFileAccessors(dataFile);
429            fileByFileMap.remove(dataFile.getFile());
430            fileMap.remove(dataFile.getDataFileId());
431            totalLength.addAndGet(-dataFile.getLength());
432            dataFile.unlink();
433            if (archiveDataLogs) {
434                dataFile.move(getDirectoryArchive());
435                LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
436            } else {
437                if ( dataFile.delete() ) {
438                    LOG.debug("Discarded data file " + dataFile);
439                } else {
440                    LOG.warn("Failed to discard data file " + dataFile.getFile());
441                }
442            }
443        }
444    
445        /**
446         * @return the maxFileLength
447         */
448        public int getMaxFileLength() {
449            return maxFileLength;
450        }
451    
452        /**
453         * @param maxFileLength the maxFileLength to set
454         */
455        public void setMaxFileLength(int maxFileLength) {
456            this.maxFileLength = maxFileLength;
457        }
458    
459        @Override
460        public String toString() {
461            return directory.toString();
462        }
463    
464            public synchronized void appendedExternally(Location loc, int length) throws IOException {
465                    DataFile dataFile = null;
466                    if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
467                            // It's an update to the current log file..
468                            dataFile = dataFiles.getTail();
469                            dataFile.incrementLength(length);
470                    } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
471                            // It's an update to the next log file.
472                int nextNum = loc.getDataFileId();
473                File file = getFile(nextNum);
474                dataFile = new DataFile(file, nextNum, preferedFileLength);
475                // actually allocate the disk space
476                fileMap.put(dataFile.getDataFileId(), dataFile);
477                fileByFileMap.put(file, dataFile);
478                dataFiles.addLast(dataFile);
479                    } else {
480                            throw new IOException("Invalid external append.");
481                    }
482            }
483    
484        public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
485    
486            Location cur = null;
487            while (true) {
488                if (cur == null) {
489                    if (location == null) {
490                        DataFile head = dataFiles.getHead();
491                        if( head == null ) {
492                            return null;
493                        }
494                        cur = new Location();
495                        cur.setDataFileId(head.getDataFileId());
496                        cur.setOffset(0);
497                    } else {
498                        // Set to the next offset..
499                        if (location.getSize() == -1) {
500                            cur = new Location(location);
501                        } else {
502                            cur = new Location(location);
503                            cur.setOffset(location.getOffset() + location.getSize());
504                        }
505                    }
506                } else {
507                    cur.setOffset(cur.getOffset() + cur.getSize());
508                }
509    
510                DataFile dataFile = getDataFile(cur);
511    
512                // Did it go into the next file??
513                if (dataFile.getLength() <= cur.getOffset()) {
514                    dataFile = getNextDataFile(dataFile);
515                    if (dataFile == null) {
516                        return null;
517                    } else {
518                        cur.setDataFileId(dataFile.getDataFileId().intValue());
519                        cur.setOffset(0);
520                    }
521                }
522    
523                // Load in location size and type.
524                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
525                try {
526                                    reader.readLocationDetails(cur);
527                } finally {
528                    accessorPool.closeDataFileAccessor(reader);
529                }
530    
531                if (cur.getType() == 0) {
532                    return null;
533                } else if (cur.getType() == USER_RECORD_TYPE) {
534                    // Only return user records.
535                    return cur;
536                }
537            }
538        }
539    
540        public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
541            DataFile df = fileByFileMap.get(file);
542            return getNextLocation(df, lastLocation, thisFileOnly);
543        }
544    
545        public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
546    
547            Location cur = null;
548            while (true) {
549                if (cur == null) {
550                    if (lastLocation == null) {
551                        DataFile head = dataFile.getHeadNode();
552                        cur = new Location();
553                        cur.setDataFileId(head.getDataFileId());
554                        cur.setOffset(0);
555                    } else {
556                        // Set to the next offset..
557                        cur = new Location(lastLocation);
558                        cur.setOffset(cur.getOffset() + cur.getSize());
559                    }
560                } else {
561                    cur.setOffset(cur.getOffset() + cur.getSize());
562                }
563    
564                // Did it go into the next file??
565                if (dataFile.getLength() <= cur.getOffset()) {
566                    if (thisFileOnly) {
567                        return null;
568                    } else {
569                        dataFile = getNextDataFile(dataFile);
570                        if (dataFile == null) {
571                            return null;
572                        } else {
573                            cur.setDataFileId(dataFile.getDataFileId().intValue());
574                            cur.setOffset(0);
575                        }
576                    }
577                }
578    
579                // Load in location size and type.
580                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
581                try {
582                    reader.readLocationDetails(cur);
583                } finally {
584                    accessorPool.closeDataFileAccessor(reader);
585                }
586    
587                if (cur.getType() == 0) {
588                    return null;
589                } else if (cur.getType() > 0) {
590                    // Only return user records.
591                    return cur;
592                }
593            }
594        }
595    
596        public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
597            DataFile dataFile = getDataFile(location);
598            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
599            ByteSequence rc = null;
600            try {
601                rc = reader.readRecord(location);
602            } finally {
603                accessorPool.closeDataFileAccessor(reader);
604            }
605            return rc;
606        }
607    
608        public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
609            Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
610            return loc;
611        }
612    
613        public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
614            Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
615            return loc;
616        }
617    
618        public void update(Location location, ByteSequence data, boolean sync) throws IOException {
619            DataFile dataFile = getDataFile(location);
620            DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
621            try {
622                updater.updateRecord(location, data, sync);
623            } finally {
624                accessorPool.closeDataFileAccessor(updater);
625            }
626        }
627    
628        public File getDirectory() {
629            return directory;
630        }
631    
632        public void setDirectory(File directory) {
633            this.directory = directory;
634        }
635    
636        public String getFilePrefix() {
637            return filePrefix;
638        }
639    
640        public void setFilePrefix(String filePrefix) {
641            this.filePrefix = filePrefix;
642        }
643    
644        public Map<WriteKey, WriteCommand> getInflightWrites() {
645            return inflightWrites;
646        }
647    
648        public Location getLastAppendLocation() {
649            return lastAppendLocation.get();
650        }
651    
652        public void setLastAppendLocation(Location lastSyncedLocation) {
653            this.lastAppendLocation.set(lastSyncedLocation);
654        }
655    
656        public File getDirectoryArchive() {
657            return directoryArchive;
658        }
659    
660        public void setDirectoryArchive(File directoryArchive) {
661            this.directoryArchive = directoryArchive;
662        }
663    
664        public boolean isArchiveDataLogs() {
665            return archiveDataLogs;
666        }
667    
668        public void setArchiveDataLogs(boolean archiveDataLogs) {
669            this.archiveDataLogs = archiveDataLogs;
670        }
671    
672        synchronized public Integer getCurrentDataFileId() {
673            if (dataFiles.isEmpty())
674                return null;
675            return dataFiles.getTail().getDataFileId();
676        }
677    
678        /**
679         * Get a set of files - only valid after start()
680         * 
681         * @return files currently being used
682         */
683        public Set<File> getFiles() {
684            return fileByFileMap.keySet();
685        }
686    
687        public synchronized Map<Integer, DataFile> getFileMap() {
688            return new TreeMap<Integer, DataFile>(fileMap);
689        }
690        
691        public long getDiskSize() {
692            long tailLength=0;
693            synchronized( this ) {
694                if( !dataFiles.isEmpty() ) {
695                    tailLength = dataFiles.getTail().getLength();
696                }
697            }
698            
699            long rc = totalLength.get();
700            
701            // The last file is actually at a minimum preferedFileLength big.
702            if( tailLength < preferedFileLength ) {
703                rc -= tailLength;
704                rc += preferedFileLength;
705            }
706            return rc;
707        }
708    
709            public void setReplicationTarget(ReplicationTarget replicationTarget) {
710                    this.replicationTarget = replicationTarget;
711            }
712            public ReplicationTarget getReplicationTarget() {
713                    return replicationTarget;
714            }
715    
716        public String getFileSuffix() {
717            return fileSuffix;
718        }
719    
720        public void setFileSuffix(String fileSuffix) {
721            this.fileSuffix = fileSuffix;
722        }
723    
724            public boolean isChecksum() {
725                    return checksum;
726            }
727    
728            public void setChecksum(boolean checksumWrites) {
729                    this.checksum = checksumWrites;
730            }
731    
732        public boolean isCheckForCorruptionOnStartup() {
733            return checkForCorruptionOnStartup;
734        }
735    
736        public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
737            this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
738        }
739    
740        public void setWriteBatchSize(int writeBatchSize) {
741            this.writeBatchSize = writeBatchSize;
742        }
743        
744        public int getWriteBatchSize() {
745            return writeBatchSize;
746        }
747    
748        public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
749           this.totalLength = storeSizeAccumulator;
750        }
751    }