001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.page;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InterruptedIOException;
028    import java.io.RandomAccessFile;
029    import java.util.*;
030    import java.util.Map.Entry;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    import java.util.concurrent.atomic.AtomicLong;
034    import java.util.zip.Adler32;
035    import java.util.zip.Checksum;
036    
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    import org.apache.kahadb.util.DataByteArrayOutputStream;
040    import org.apache.kahadb.util.IOExceptionSupport;
041    import org.apache.kahadb.util.IOHelper;
042    import org.apache.kahadb.util.IntrospectionSupport;
043    import org.apache.kahadb.util.LRUCache;
044    import org.apache.kahadb.util.Sequence;
045    import org.apache.kahadb.util.SequenceSet;
046    
047    /**
048     * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 
049     * be externally synchronized.
050     * 
051     * The file has 3 parts:
052     * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
053     * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
054     * Page Space: The pages in the page file.
055     * 
056     * 
057     */
058    public class PageFile {
059        
060        private static final String PAGEFILE_SUFFIX = ".data";
061        private static final String RECOVERY_FILE_SUFFIX = ".redo";
062        private static final String FREE_FILE_SUFFIX = ".free";
063        
064        // 4k Default page size.
065        public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
066        public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
067        private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
068        private static final int PAGE_FILE_HEADER_SIZE=1024*4;
069    
070        // Recovery header is (long offset)
071        private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
072    
073        // A PageFile will use a couple of files in this directory
074        private File directory;
075        // And the file names in that directory will be based on this name.
076        private final String name;
077        
078        // File handle used for reading pages..
079        private RandomAccessFile readFile;
080        // File handle used for writing pages..
081        private RandomAccessFile writeFile;
082        // File handle used for writing pages..
083        private RandomAccessFile recoveryFile;
084    
085        // The size of pages
086        private int pageSize = DEFAULT_PAGE_SIZE;
087        
088        // The minimum number of space allocated to the recovery file in number of pages.
089        private int recoveryFileMinPageCount = 1000;
090        // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 
091        // to this max size as soon as  possible.
092        private int recoveryFileMaxPageCount = 10000;
093        // The number of pages in the current recovery buffer
094        private int recoveryPageCount;
095    
096        private AtomicBoolean loaded = new AtomicBoolean();
097        // The number of pages we are aiming to write every time we 
098        // write to disk.
099        int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
100    
101        // We keep a cache of pages recently used?
102        private Map<Long, Page> pageCache;
103        // The cache of recently used pages.
104        private boolean enablePageCaching=true;
105        // How many pages will we keep in the cache?
106        private int pageCacheSize = 100;
107        
108        // Should first log the page write to the recovery buffer? Avoids partial
109        // page write failures..
110        private boolean enableRecoveryFile=true;
111        // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
112        private boolean enableDiskSyncs=true;
113        // Will writes be done in an async thread?
114        private boolean enabledWriteThread=false;
115    
116        // These are used if enableAsyncWrites==true 
117        private AtomicBoolean stopWriter = new AtomicBoolean();
118        private Thread writerThread;
119        private CountDownLatch checkpointLatch;
120    
121        // Keeps track of writes that are being written to disk.
122        private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
123    
124        // Keeps track of free pages.
125        private final AtomicLong nextFreePageId = new AtomicLong();
126        private SequenceSet freeList = new SequenceSet();
127        
128        private AtomicLong nextTxid = new AtomicLong();
129        
130        // Persistent settings stored in the page file. 
131        private MetaData metaData;
132        
133        /**
134         * Use to keep track of updated pages which have not yet been committed.
135         */
136        static class PageWrite {
137            Page page;
138            byte[] current;
139            byte[] diskBound;
140    
141            public PageWrite(Page page, byte[] data) {
142                this.page=page;
143                current=data;
144            }
145                    
146            public void setCurrent(Page page, byte[] data) {
147                this.page=page;
148                current=data;
149            }
150    
151            @Override
152            public String toString() {
153                return "[PageWrite:"+page.getPageId()+"]";
154            }
155    
156            @SuppressWarnings("unchecked")
157            public Page getPage() {
158                return page;
159            }
160            
161            void begin() {
162               diskBound = current;
163               current = null;
164            }
165            
166            /**
167             * @return true if there is no pending writes to do.
168             */
169            boolean done() {
170                diskBound=null;
171                return current == null;
172            }
173            
174            boolean isDone() {
175                return diskBound == null && current == null;
176            }
177    
178        }
179        
180        /**
181         * The MetaData object hold the persistent data associated with a PageFile object. 
182         */
183        public static class MetaData {
184            
185            String fileType;
186            String fileTypeVersion;
187            
188            long metaDataTxId=-1;
189            int pageSize;
190            boolean cleanShutdown;
191            long lastTxId;
192            long freePages;
193            
194            public String getFileType() {
195                return fileType;
196            }
197            public void setFileType(String fileType) {
198                this.fileType = fileType;
199            }
200            public String getFileTypeVersion() {
201                return fileTypeVersion;
202            }
203            public void setFileTypeVersion(String version) {
204                this.fileTypeVersion = version;
205            }
206            public long getMetaDataTxId() {
207                return metaDataTxId;
208            }
209            public void setMetaDataTxId(long metaDataTxId) {
210                this.metaDataTxId = metaDataTxId;
211            }
212            public int getPageSize() {
213                return pageSize;
214            }
215            public void setPageSize(int pageSize) {
216                this.pageSize = pageSize;
217            }
218            public boolean isCleanShutdown() {
219                return cleanShutdown;
220            }
221            public void setCleanShutdown(boolean cleanShutdown) {
222                this.cleanShutdown = cleanShutdown;
223            }
224            public long getLastTxId() {
225                return lastTxId;
226            }
227            public void setLastTxId(long lastTxId) {
228                this.lastTxId = lastTxId;
229            }
230            public long getFreePages() {
231                return freePages;
232            }
233            public void setFreePages(long value) {
234                this.freePages = value;
235            }
236        }
237    
238        public Transaction tx() {
239            assertLoaded();
240            return new Transaction(this);
241        }
242        
243        /**
244         * Creates a PageFile in the specified directory who's data files are named by name.
245         * 
246         * @param directory
247         * @param name
248         */
249        public PageFile(File directory, String name) {
250            this.directory = directory;
251            this.name = name;
252        }
253        
254        /**
255         * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
256         * 
257         * @throws IOException 
258         *         if the files cannot be deleted.
259         * @throws IllegalStateException 
260         *         if this PageFile is loaded
261         */
262        public void delete() throws IOException {
263            if( loaded.get() ) {
264                throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
265            }
266            delete(getMainPageFile());
267            delete(getFreeFile());
268            delete(getRecoveryFile());
269        }
270    
271        /**
272         * @param file
273         * @throws IOException
274         */
275        private void delete(File file) throws IOException {
276            if( file.exists() ) {
277                if( !file.delete() ) {
278                    throw new IOException("Could not delete: "+file.getPath());
279                }
280            }
281        }
282        
283        /**
284         * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the 
285         * first time the page file is loaded, then this creates the page file in the file system.
286         * 
287         * @throws IOException
288         *         If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 
289         *         there was a disk error.
290         * @throws IllegalStateException 
291         *         If the page file was already loaded.
292         */
293        public void load() throws IOException, IllegalStateException {
294            if (loaded.compareAndSet(false, true)) {
295                
296                if( enablePageCaching ) {
297                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
298                }
299                
300                File file = getMainPageFile();
301                IOHelper.mkdirs(file.getParentFile());
302                writeFile = new RandomAccessFile(file, "rw");
303                readFile = new RandomAccessFile(file, "r");
304                
305                if (readFile.length() > 0) {
306                    // Load the page size setting cause that can't change once the file is created.
307                    loadMetaData();
308                    pageSize = metaData.getPageSize();
309                } else {
310                    // Store the page size setting cause that can't change once the file is created.
311                    metaData = new MetaData();
312                    metaData.setFileType(PageFile.class.getName());
313                    metaData.setFileTypeVersion("1");
314                    metaData.setPageSize(getPageSize());
315                    metaData.setCleanShutdown(true);
316                    metaData.setFreePages(-1);
317                    metaData.setLastTxId(0);
318                    storeMetaData();
319                }
320    
321                if( enableRecoveryFile ) {
322                    recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
323                }
324                
325                if(  metaData.isCleanShutdown() ) {
326                    nextTxid.set(metaData.getLastTxId()+1);
327                    if( metaData.getFreePages()>0 ) {
328                        loadFreeList();
329                    } 
330                } else {
331                    LOG.debug(toString() + ", Recovering page file...");
332                    nextTxid.set(redoRecoveryUpdates());
333                    
334                    // Scan all to find the free pages.
335                    freeList = new SequenceSet();
336                    for (Iterator i = tx().iterator(true); i.hasNext();) {
337                        Page page = (Page)i.next();
338                        if( page.getType() == Page.PAGE_FREE_TYPE ) {
339                            freeList.add(page.getPageId());
340                        }
341                    }
342                    
343                }
344                
345                metaData.setCleanShutdown(false);
346                storeMetaData();
347                getFreeFile().delete();
348                
349                if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
350                    writeFile.setLength(PAGE_FILE_HEADER_SIZE);
351                }
352                nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
353                startWriter();
354                    
355            } else {
356                throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
357            }
358        }
359    
360    
361        /**
362         * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
363         * once unloaded, you can no longer use the page file to read or write Pages.
364         * 
365         * @throws IOException
366         *         if there was a disk error occurred while closing the down the page file.
367         * @throws IllegalStateException
368         *         if the PageFile is not loaded
369         */
370        public void unload() throws IOException {
371            if (loaded.compareAndSet(true, false)) {
372                flush();
373                try {
374                    stopWriter();
375                } catch (InterruptedException e) {
376                    throw new InterruptedIOException();
377                }
378                
379                if( freeList.isEmpty() ) {
380                    metaData.setFreePages(0);
381                } else {
382                    storeFreeList();
383                    metaData.setFreePages(freeList.size());
384                }
385                
386                metaData.setLastTxId( nextTxid.get()-1 );
387                metaData.setCleanShutdown(true);
388                storeMetaData();
389                
390                if (readFile != null) {
391                    readFile.close();
392                    readFile = null;
393                    writeFile.close();
394                    writeFile=null;
395                    if( enableRecoveryFile ) {
396                        recoveryFile.close();
397                        recoveryFile=null;
398                    }
399                    freeList.clear();
400                    if( pageCache!=null ) {
401                        pageCache=null;
402                    }
403                    synchronized(writes) {
404                        writes.clear();
405                    }
406                }
407            } else {
408                throw new IllegalStateException("Cannot unload the page file when it is not loaded");
409            }
410        }
411            
412        public boolean isLoaded() {
413            return loaded.get();
414        }
415    
416        /**
417         * Flush and sync all write buffers to disk.
418         * 
419         * @throws IOException
420         *         If an disk error occurred.
421         */
422        public void flush() throws IOException {
423    
424            if( enabledWriteThread && stopWriter.get() ) {
425                throw new IOException("Page file already stopped: checkpointing is not allowed");
426            }
427            
428            // Setup a latch that gets notified when all buffered writes hits the disk.
429            CountDownLatch checkpointLatch;
430            synchronized( writes ) {
431                if( writes.isEmpty()) {                
432                    return;
433                }
434                if( enabledWriteThread ) {
435                    if( this.checkpointLatch == null ) {
436                        this.checkpointLatch = new CountDownLatch(1);
437                    }
438                    checkpointLatch = this.checkpointLatch;
439                    writes.notify();
440                } else {
441                    writeBatch();
442                    return;
443                }
444            }
445            try {
446                checkpointLatch.await();
447            } catch (InterruptedException e) {
448                throw new InterruptedIOException();
449            }
450        }
451    
452        
453        public String toString() {
454            return "Page File: "+getMainPageFile();
455        }
456        
457        ///////////////////////////////////////////////////////////////////
458        // Private Implementation Methods
459        ///////////////////////////////////////////////////////////////////
460        private File getMainPageFile() {
461            return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX);
462        }
463        
464        public File getFreeFile() {
465            return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX);
466        } 
467    
468        public File getRecoveryFile() {
469            return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
470        } 
471    
472        private long toOffset(long pageId) {
473            return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
474        }
475    
476        private void loadMetaData() throws IOException {
477    
478            ByteArrayInputStream is;
479            MetaData v1 = new MetaData();
480            MetaData v2 = new MetaData();
481            try {
482                Properties p = new Properties();
483                byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
484                readFile.seek(0);
485                readFile.readFully(d);
486                is = new ByteArrayInputStream(d);
487                p.load(is);
488                IntrospectionSupport.setProperties(v1, p);
489            } catch (IOException e) {
490                v1 = null;
491            }
492            
493            try {
494                Properties p = new Properties();
495                byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
496                readFile.seek(PAGE_FILE_HEADER_SIZE/2);
497                readFile.readFully(d);
498                is = new ByteArrayInputStream(d);
499                p.load(is);
500                IntrospectionSupport.setProperties(v2, p);
501            } catch (IOException e) {
502                v2 = null;
503            }
504            
505            if( v1==null && v2==null ) {
506                throw new IOException("Could not load page file meta data");
507            } 
508            
509            if( v1 == null || v1.metaDataTxId<0 ) {
510                metaData = v2;
511            } else if( v2==null || v1.metaDataTxId<0 ) {
512                metaData = v1;
513            } else if( v1.metaDataTxId==v2.metaDataTxId ) {
514                metaData = v1; // use the first since the 2nd could be a partial..
515            } else {
516                metaData = v2; // use the second cause the first is probably a partial.
517            }
518        }
519        
520        private void storeMetaData() throws IOException {
521            // Convert the metadata into a property format
522            metaData.metaDataTxId++;
523            Properties p = new Properties();
524            IntrospectionSupport.getProperties(metaData, p, null);
525            
526            ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
527            p.store(os, "");
528            if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 
529                throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2);
530            }
531            // Fill the rest with space...
532            byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()];
533            Arrays.fill(filler, (byte)' ');
534            os.write(filler);
535            os.flush();
536            
537            byte[] d = os.toByteArray();
538    
539            // So we don't loose it.. write it 2 times...
540            writeFile.seek(0);
541            writeFile.write(d);
542            writeFile.getFD().sync();
543            writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
544            writeFile.write(d);
545            writeFile.getFD().sync();
546        }
547    
548        private void storeFreeList() throws IOException {
549            FileOutputStream os = new FileOutputStream(getFreeFile());
550            DataOutputStream dos = new DataOutputStream(os);
551            SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
552            dos.close();
553        }
554    
555        private void loadFreeList() throws IOException {
556            freeList.clear();
557            FileInputStream is = new FileInputStream(getFreeFile());
558            DataInputStream dis = new DataInputStream(is);
559            freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
560            dis.close();
561        }
562        
563        ///////////////////////////////////////////////////////////////////
564        // Property Accessors 
565        ///////////////////////////////////////////////////////////////////
566        
567        /**
568         * Is the recovery buffer used to double buffer page writes.  Enabled by default.
569         * 
570         * @return is the recovery buffer enabled.
571         */
572        public boolean isEnableRecoveryFile() {
573            return enableRecoveryFile;
574        }
575    
576        /**
577         * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
578         * may potentially cause partial page writes which can lead to page file corruption.
579         */
580        public void setEnableRecoveryFile(boolean doubleBuffer) {
581            assertNotLoaded();
582            this.enableRecoveryFile = doubleBuffer;
583        }
584    
585        /**
586         * @return Are page writes synced to disk?
587         */
588        public boolean isEnableDiskSyncs() {
589            return enableDiskSyncs;
590        }
591    
592        /**
593         * Allows you enable syncing writes to disk.
594         * @param syncWrites
595         */
596        public void setEnableDiskSyncs(boolean syncWrites) {
597            assertNotLoaded();
598            this.enableDiskSyncs = syncWrites;
599        }
600        
601        /**
602         * @return the page size
603         */
604        public int getPageSize() {
605            return this.pageSize;
606        }
607    
608        /**
609         * @return the amount of content data that a page can hold.
610         */
611        public int getPageContentSize() {
612            return this.pageSize-Page.PAGE_HEADER_SIZE;
613        }
614        
615        /**
616         * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
617         * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
618         * can no longer be changed.
619         * 
620         * @param pageSize the pageSize to set
621         * @throws IllegalStateException
622         *         once the page file is loaded.
623         */
624        public void setPageSize(int pageSize) throws IllegalStateException {
625            assertNotLoaded();
626            this.pageSize = pageSize;
627        }
628        
629        /**
630         * @return true if read page caching is enabled
631         */
632        public boolean isEnablePageCaching() {
633            return this.enablePageCaching;
634        }
635    
636        /**
637         * @param allows you to enable read page caching
638         */
639        public void setEnablePageCaching(boolean enablePageCaching) {
640            assertNotLoaded();
641            this.enablePageCaching = enablePageCaching;
642        }
643    
644        /**
645         * @return the maximum number of pages that will get stored in the read page cache.
646         */
647        public int getPageCacheSize() {
648            return this.pageCacheSize;
649        }
650    
651        /**
652         * @param Sets the maximum number of pages that will get stored in the read page cache.
653         */
654        public void setPageCacheSize(int pageCacheSize) {
655            assertNotLoaded();
656            this.pageCacheSize = pageCacheSize;
657        }
658    
659        public boolean isEnabledWriteThread() {
660            return enabledWriteThread;
661        }
662    
663        public void setEnableWriteThread(boolean enableAsyncWrites) {
664            assertNotLoaded();
665            this.enabledWriteThread = enableAsyncWrites;
666        }
667    
668        public long getDiskSize() throws IOException {
669            return toOffset(nextFreePageId.get());
670        }
671        
672        /**
673         * @return the number of pages allocated in the PageFile
674         */
675        public long getPageCount() {
676            return nextFreePageId.get();
677        }
678    
679        public int getRecoveryFileMinPageCount() {
680            return recoveryFileMinPageCount;
681        }
682    
683        public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
684            assertNotLoaded();
685            this.recoveryFileMinPageCount = recoveryFileMinPageCount;
686        }
687    
688        public int getRecoveryFileMaxPageCount() {
689            return recoveryFileMaxPageCount;
690        }
691    
692        public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
693            assertNotLoaded();
694            this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
695        }
696    
697            public int getWriteBatchSize() {
698                    return writeBatchSize;
699            }
700    
701            public void setWriteBatchSize(int writeBatchSize) {
702            assertNotLoaded();
703                    this.writeBatchSize = writeBatchSize;
704            }
705    
706            ///////////////////////////////////////////////////////////////////
707        // Package Protected Methods exposed to Transaction
708        ///////////////////////////////////////////////////////////////////
709    
710        /**
711         * @throws IllegalStateException if the page file is not loaded.
712         */
713        void assertLoaded() throws IllegalStateException {
714            if( !loaded.get() ) {
715                throw new IllegalStateException("PageFile is not loaded");
716            }
717        }
718        void assertNotLoaded() throws IllegalStateException {
719            if( loaded.get() ) {
720                throw new IllegalStateException("PageFile is loaded");
721            }
722        }
723            
724        /** 
725         * Allocates a block of free pages that you can write data to.
726         * 
727         * @param count the number of sequential pages to allocate
728         * @return the first page of the sequential set. 
729         * @throws IOException
730         *         If an disk error occurred.
731         * @throws IllegalStateException
732         *         if the PageFile is not loaded
733         */
734        <T> Page<T> allocate(int count) throws IOException {
735            assertLoaded();
736            if (count <= 0) {
737                throw new IllegalArgumentException("The allocation count must be larger than zero");
738            }
739    
740            Sequence seq = freeList.removeFirstSequence(count);
741    
742            // We may need to create new free pages...
743            if (seq == null) {
744    
745                Page<T> first = null;
746                int c = count;
747                while (c > 0) {
748                    Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
749                    page.makeFree(getNextWriteTransactionId());
750    
751                    if (first == null) {
752                        first = page;
753                    }
754    
755                    addToCache(page);
756                    DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
757                    page.write(out);
758                    write(page, out.getData());
759    
760                    // LOG.debug("allocate writing: "+page.getPageId());
761                    c--;
762                }
763    
764                return first;
765            }
766    
767            Page<T> page = new Page<T>(seq.getFirst());
768            page.makeFree(0);
769            // LOG.debug("allocated: "+page.getPageId());
770            return page;
771        }
772    
773        long getNextWriteTransactionId() {
774            return nextTxid.incrementAndGet();
775        }
776    
777        void readPage(long pageId, byte[] data) throws IOException {
778            readFile.seek(toOffset(pageId));
779            readFile.readFully(data);
780        }
781    
782        public void freePage(long pageId) {
783            freeList.add(pageId);
784            if( enablePageCaching ) {
785                pageCache.remove(pageId);
786            }
787        }
788        
789        @SuppressWarnings("unchecked")
790        private <T> void write(Page<T> page, byte[] data) throws IOException {
791            final PageWrite write = new PageWrite(page, data);
792            Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){
793                public Long getKey() {
794                    return write.getPage().getPageId();
795                }
796                public PageWrite getValue() {
797                    return write;
798                }
799                public PageWrite setValue(PageWrite value) {
800                    return null;
801                }
802            };
803            Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
804            write(Arrays.asList(entries));
805        }
806    
807        void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
808            synchronized( writes ) {
809                if( enabledWriteThread  ) {
810                    while( writes.size() >= writeBatchSize && !stopWriter.get() ) {
811                        try {
812                            writes.wait();
813                        } catch (InterruptedException e) {
814                            Thread.currentThread().interrupt();
815                            throw new InterruptedIOException();
816                        }
817                    }
818                }
819    
820                for (Map.Entry<Long, PageWrite> entry : updates) {
821                    Long key = entry.getKey();
822                    PageWrite value = entry.getValue();
823                    PageWrite write = writes.get(key);
824                    if( write==null ) {
825                        writes.put(key, value);
826                    } else {
827                        write.setCurrent(value.page, value.current);
828                    }
829                }
830                
831                // Once we start approaching capacity, notify the writer to start writing
832                if( canStartWriteBatch() ) {
833                    if( enabledWriteThread  ) {
834                        writes.notify();
835                    } else {
836                        writeBatch();
837                    }
838                }
839            }            
840        }
841        
842        private boolean canStartWriteBatch() {
843                    int capacityUsed = ((writes.size() * 100)/writeBatchSize);
844            if( enabledWriteThread ) {
845                // The constant 10 here controls how soon write batches start going to disk..
846                // would be nice to figure out how to auto tune that value.  Make to small and
847                // we reduce through put because we are locking the write mutex too often doing writes
848                return capacityUsed >= 10 || checkpointLatch!=null;
849            } else {
850                return capacityUsed >= 80 || checkpointLatch!=null;
851            }
852        }
853    
854        ///////////////////////////////////////////////////////////////////
855        // Cache Related operations
856        ///////////////////////////////////////////////////////////////////
857        @SuppressWarnings("unchecked")
858        <T> Page<T> getFromCache(long pageId) {
859            synchronized(writes) {
860                PageWrite pageWrite = writes.get(pageId);
861                if( pageWrite != null ) {
862                    return pageWrite.page;
863                }
864            }
865    
866            Page<T> result = null;
867            if (enablePageCaching) {
868                result = pageCache.get(pageId);
869            }
870            return result;
871        }
872    
873        void addToCache(Page page) {
874            if (enablePageCaching) {
875                pageCache.put(page.getPageId(), page);
876            }
877        }
878    
879        void removeFromCache(Page page) {
880            if (enablePageCaching) {
881                pageCache.remove(page.getPageId());
882            }
883        }
884    
885        ///////////////////////////////////////////////////////////////////
886        // Internal Double write implementation follows...
887        ///////////////////////////////////////////////////////////////////
888        /**
889         * 
890         */
891        private void pollWrites() {
892            try {
893                while( !stopWriter.get() ) {
894                    // Wait for a notification...
895                    synchronized( writes ) {  
896                        writes.notifyAll();
897                        
898                        // If there is not enough to write, wait for a notification...
899                        while( writes.isEmpty() && checkpointLatch==null && !stopWriter.get() ) {
900                            writes.wait(100);
901                        }
902                        
903                        if( writes.isEmpty() ) {
904                            releaseCheckpointWaiter();
905                        }
906                    }
907                    writeBatch();
908                }
909            } catch (Throwable e) {
910                e.printStackTrace();
911            } finally {
912                releaseCheckpointWaiter();
913            }
914        }
915    
916        /**
917         * 
918         * @param timeout
919         * @param unit
920         * @return true if there are still pending writes to do.
921         * @throws InterruptedException 
922         * @throws IOException 
923         */
924        private void writeBatch() throws IOException {
925                
926            CountDownLatch checkpointLatch;
927            ArrayList<PageWrite> batch;
928            synchronized( writes ) {
929                // If there is not enough to write, wait for a notification...
930    
931                batch = new ArrayList<PageWrite>(writes.size());
932                // build a write batch from the current write cache.
933                for (PageWrite write : writes.values()) {
934                    batch.add(write);
935                    // Move the current write to the diskBound write, this lets folks update the 
936                    // page again without blocking for this write.
937                    write.begin();
938                    if (write.diskBound == null) {
939                        batch.remove(write);
940                    }
941                }
942    
943                // Grab on to the existing checkpoint latch cause once we do this write we can 
944                // release the folks that were waiting for those writes to hit disk.
945                checkpointLatch = this.checkpointLatch;
946                this.checkpointLatch=null;
947            }
948            
949           try {
950                if (enableRecoveryFile) {
951    
952                    // Using Adler-32 instead of CRC-32 because it's much faster and
953                    // it's
954                    // weakness for short messages with few hundred bytes is not a
955                    // factor in this case since we know
956                    // our write batches are going to much larger.
957                    Checksum checksum = new Adler32();
958                    for (PageWrite w : batch) {
959                        try {
960                            checksum.update(w.diskBound, 0, pageSize);
961                        } catch (Throwable t) {
962                            throw IOExceptionSupport.create(
963                                    "Cannot create recovery file. Reason: " + t, t);
964                        }
965                    }
966    
967                    // Can we shrink the recovery buffer??
968                    if (recoveryPageCount > recoveryFileMaxPageCount) {
969                        int t = Math.max(recoveryFileMinPageCount, batch.size());
970                        recoveryFile.setLength(recoveryFileSizeForPages(t));
971                    }
972    
973                    // Record the page writes in the recovery buffer.
974                    recoveryFile.seek(0);
975                    // Store the next tx id...
976                    recoveryFile.writeLong(nextTxid.get());
977                    // Store the checksum for thw write batch so that on recovery we
978                    // know if we have a consistent
979                    // write batch on disk.
980                    recoveryFile.writeLong(checksum.getValue());
981                    // Write the # of pages that will follow
982                    recoveryFile.writeInt(batch.size());
983    
984                    // Write the pages.
985                    recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
986    
987                    for (PageWrite w : batch) {
988                        recoveryFile.writeLong(w.page.getPageId());
989                        recoveryFile.write(w.diskBound, 0, pageSize);
990                    }
991    
992                    if (enableDiskSyncs) {
993                        // Sync to make sure recovery buffer writes land on disk..
994                        recoveryFile.getFD().sync();
995                    }
996    
997                    recoveryPageCount = batch.size();
998                }
999    
1000                for (PageWrite w : batch) {
1001                    writeFile.seek(toOffset(w.page.getPageId()));
1002                    writeFile.write(w.diskBound, 0, pageSize);
1003                    w.done();
1004                }
1005    
1006                // Sync again
1007                if (enableDiskSyncs) {
1008                    writeFile.getFD().sync();
1009                }
1010    
1011            } finally {
1012                synchronized (writes) {
1013                    for (PageWrite w : batch) {
1014                        // If there are no more pending writes, then remove it from
1015                        // the write cache.
1016                        if (w.isDone()) {
1017                            writes.remove(w.page.getPageId());
1018                        }
1019                    }
1020                }
1021                
1022                if( checkpointLatch!=null ) {
1023                    checkpointLatch.countDown();
1024                }
1025            }
1026        }
1027    
1028        private long recoveryFileSizeForPages(int pageCount) {
1029            return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
1030        }
1031    
1032        private void releaseCheckpointWaiter() {
1033            if( checkpointLatch!=null ) {
1034                checkpointLatch.countDown();
1035                checkpointLatch=null;
1036            }
1037        }       
1038        
1039        /**
1040         * Inspects the recovery buffer and re-applies any 
1041         * partially applied page writes.
1042         * 
1043         * @return the next transaction id that can be used.
1044         * @throws IOException
1045         */
1046        private long redoRecoveryUpdates() throws IOException {
1047            if( !enableRecoveryFile ) {
1048                return 0;
1049            }
1050            recoveryPageCount=0;
1051            
1052            // Are we initializing the recovery file?
1053            if( recoveryFile.length() == 0 ) {
1054                // Write an empty header..
1055                recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1056                // Preallocate the minium size for better performance.
1057                recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1058                return 0;
1059            }
1060            
1061            // How many recovery pages do we have in the recovery buffer?
1062            recoveryFile.seek(0);
1063            long nextTxId = recoveryFile.readLong();
1064            long expectedChecksum = recoveryFile.readLong();
1065            int pageCounter = recoveryFile.readInt();
1066            
1067            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1068            Checksum checksum = new Adler32();
1069            LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1070            try {
1071                for (int i = 0; i < pageCounter; i++) {
1072                    long offset = recoveryFile.readLong();
1073                    byte []data = new byte[pageSize];
1074                    if( recoveryFile.read(data, 0, pageSize) != pageSize ) {
1075                        // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1076                        return nextTxId;
1077                    }
1078                    checksum.update(data, 0, pageSize);
1079                    batch.put(offset, data);
1080                }
1081            } catch (Exception e) {
1082                // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
1083                // as the pages should still be consistent.
1084                LOG.debug("Redo buffer was not fully intact: ", e);
1085                return nextTxId;
1086            }
1087            
1088            recoveryPageCount = pageCounter;
1089            
1090            // If the checksum is not valid then the recovery buffer was partially written to disk.
1091            if( checksum.getValue() != expectedChecksum ) {
1092                return nextTxId;
1093            }
1094            
1095            // Re-apply all the writes in the recovery buffer.
1096            for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1097                writeFile.seek(toOffset(e.getKey()));
1098                writeFile.write(e.getValue());
1099            }
1100            
1101            // And sync it to disk
1102            writeFile.getFD().sync();
1103            return nextTxId;
1104        }
1105    
1106        private void startWriter() {
1107            synchronized( writes ) {
1108                if( enabledWriteThread ) {
1109                    stopWriter.set(false);
1110                    writerThread = new Thread("KahaDB Page Writer") {
1111                        @Override
1112                        public void run() {
1113                            pollWrites();
1114                        }
1115                    };
1116                    writerThread.setPriority(Thread.MAX_PRIORITY);
1117                    writerThread.setDaemon(true);
1118                    writerThread.start();
1119                }
1120            }
1121        }
1122     
1123        private void stopWriter() throws InterruptedException {
1124            if( enabledWriteThread ) {
1125                stopWriter.set(true);
1126                writerThread.join();
1127            }
1128        }
1129    
1130            public File getFile() {
1131                    return getMainPageFile();
1132            }
1133    
1134    }