001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.page;
018    
019    import java.io.DataInputStream;
020    import java.io.EOFException;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.OutputStream;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.NoSuchElementException;
027    
028    import org.apache.kahadb.page.PageFile.PageWrite;
029    import org.apache.kahadb.util.ByteSequence;
030    import org.apache.kahadb.util.DataByteArrayInputStream;
031    import org.apache.kahadb.util.DataByteArrayOutputStream;
032    import org.apache.kahadb.util.Marshaller;
033    import org.apache.kahadb.util.Sequence;
034    import org.apache.kahadb.util.SequenceSet;
035    
036    /**
037     * The class used to read/update a PageFile object.  Using a transaction allows you to
038     * do multiple update operations in a single unit of work.
039     */
040    public class Transaction implements Iterable<Page> {
041        
042        /**
043         * The PageOverflowIOException occurs when a page write is requested
044         * and it's data is larger than what would fit into a single page.
045         */
046        public class PageOverflowIOException extends IOException {
047            public PageOverflowIOException(String message) {
048                super(message);
049            }
050        }
051        
052        /**
053         * The InvalidPageIOException is thrown if try to load/store a a page
054         * with an invalid page id.
055         */
056        public class InvalidPageIOException extends IOException {
057            private final long page;
058    
059            public InvalidPageIOException(String message, long page) {
060                super(message);
061                this.page = page;
062            }
063    
064            public long getPage() {
065                return page;
066            }
067        }    
068        
069        /**
070         * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
071         * 
072         * @param <T> The type of exceptions that operation will throw.
073         */
074        public interface Closure <T extends Throwable> {
075            public void execute(Transaction tx) throws T;
076        }
077    
078        /**
079         * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
080         * 
081         * @param <R> The type of result that the closure produces.
082         * @param <T> The type of exceptions that operation will throw.
083         */
084        public interface CallableClosure<R, T extends Throwable> {
085            public R execute(Transaction tx) throws T;
086        }
087        
088    
089        // The page file that this Transaction operates against.
090        private final PageFile pageFile;
091        // If this transaction is updating stuff.. this is the tx of 
092        private long writeTransactionId=-1;
093        // List of pages that this transaction has modified.
094        private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>();
095        // List of pages allocated in this transaction
096        private final SequenceSet allocateList = new SequenceSet();
097        // List of pages freed in this transaction
098        private final SequenceSet freeList = new SequenceSet();
099    
100        Transaction(PageFile pageFile) {
101            this.pageFile = pageFile;
102        }
103    
104        /**
105         * @return the page file that created this Transaction
106         */
107        public PageFile getPageFile() {
108            return this.pageFile;
109        }
110    
111        /** 
112         * Allocates a free page that you can write data to.
113         * 
114         * @return a newly allocated page.  
115         * @throws IOException
116         *         If an disk error occurred.
117         * @throws IllegalStateException
118         *         if the PageFile is not loaded
119         */
120        public <T> Page<T> allocate() throws IOException {
121            return allocate(1);
122        }
123    
124        /** 
125         * Allocates a block of free pages that you can write data to.
126         * 
127         * @param count the number of sequential pages to allocate
128         * @return the first page of the sequential set. 
129         * @throws IOException
130         *         If an disk error occurred.
131         * @throws IllegalStateException
132         *         if the PageFile is not loaded
133         */
134        public <T> Page<T> allocate(int count) throws IOException {
135            // TODO: we need to track allocated pages so that they can be returned if the 
136            // transaction gets rolled back.
137            Page<T> rc = pageFile.allocate(count);
138            allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
139            return rc;
140        }
141    
142        /**
143         * Frees up a previously allocated page so that it can be re-allocated again.
144         * 
145         * @param page the page to free up
146         * @throws IOException
147         *         If an disk error occurred.
148         * @throws IllegalStateException
149         *         if the PageFile is not loaded
150         */
151        public void free(long pageId) throws IOException {
152            free(load(pageId, null));
153        }
154    
155        /**
156         * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
157         * 
158         * @param page the initial page of the sequence that will be getting freed
159         * @param count the number of pages in the sequence
160         * 
161         * @throws IOException
162         *         If an disk error occurred.
163         * @throws IllegalStateException
164         *         if the PageFile is not loaded
165         */
166        public void free(long pageId, int count) throws IOException {
167            free(load(pageId, null), count);
168        }
169    
170        /**
171         * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
172         * 
173         * @param page the initial page of the sequence that will be getting freed
174         * @param count the number of pages in the sequence
175         * 
176         * @throws IOException
177         *         If an disk error occurred.
178         * @throws IllegalStateException
179         *         if the PageFile is not loaded
180         */
181        public <T> void free(Page<T> page, int count) throws IOException {
182            pageFile.assertLoaded();
183            long offsetPage = page.getPageId();
184            for (int i = 0; i < count; i++) {
185                if (page == null) {
186                    page = load(offsetPage + i, null);
187                }
188                free(page);
189                page = null;
190            }
191        }
192        
193        /**
194         * Frees up a previously allocated page so that it can be re-allocated again.
195         * 
196         * @param page the page to free up
197         * @throws IOException
198         *         If an disk error occurred.
199         * @throws IllegalStateException
200         *         if the PageFile is not loaded
201         */
202        public <T> void free(Page<T> page) throws IOException {
203            pageFile.assertLoaded();
204    
205            // We may need loop to free up a page chain.
206            while (page != null) {
207    
208                // Is it already free??
209                if (page.getType() == Page.PAGE_FREE_TYPE) {
210                    return;
211                }
212    
213                Page<T> next = null;
214                if (page.getType() == Page.PAGE_PART_TYPE) {
215                    next = load(page.getNext(), null);
216                }
217    
218                page.makeFree(getWriteTransactionId());
219    
220                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
221                page.write(out);
222                write(page, out.getData());
223    
224                freeList.add(page.getPageId());
225                page = next;
226            }
227        }
228    
229        /**
230         * 
231         * @param page
232         *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
233         * @param marshaller
234         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
235         * @param overflow
236         *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 
237         *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
238         *        and the overflow condition would occur, then the PageOverflowIOException is thrown. 
239         * @throws IOException
240         *         If an disk error occurred.
241         * @throws PageOverflowIOException
242         *         If the page data marshalls to size larger than maximum page size and overflow was false.
243         * @throws IllegalStateException
244         *         if the PageFile is not loaded
245         */
246        public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
247            DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
248            if (marshaller != null) {
249                marshaller.writePayload(page.get(), out);
250            }
251            out.close();
252        }
253    
254        /**
255         * @throws IOException
256         */
257        public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
258            pageFile.assertLoaded();
259    
260            // Copy to protect against the end user changing
261            // the page instance while we are doing a write.
262            final Page copy = page.copy();
263            pageFile.addToCache(copy);
264    
265            //
266            // To support writing VERY large data, we override the output stream so
267            // that we
268            // we do the page writes incrementally while the data is being
269            // marshalled.
270            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
271                Page current = copy;
272    
273                @SuppressWarnings("unchecked")
274                @Override
275                protected void onWrite() throws IOException {
276    
277                    // Are we at an overflow condition?
278                    final int pageSize = pageFile.getPageSize();
279                    if (pos >= pageSize) {
280                        // If overflow is allowed
281                        if (overflow) {
282    
283                            Page next;
284                            if (current.getType() == Page.PAGE_PART_TYPE) {
285                                next = load(current.getNext(), null);
286                            } else {
287                                next = allocate();
288                            }
289    
290                            next.txId = current.txId;
291    
292                            // Write the page header
293                            int oldPos = pos;
294                            pos = 0;
295    
296                            current.makePagePart(next.getPageId(), getWriteTransactionId());
297                            current.write(this);
298    
299                            // Do the page write..
300                            byte[] data = new byte[pageSize];
301                            System.arraycopy(buf, 0, data, 0, pageSize);
302                            Transaction.this.write(current, data);
303    
304                            // Reset for the next page chunk
305                            pos = 0;
306                            // The page header marshalled after the data is written.
307                            skip(Page.PAGE_HEADER_SIZE);
308                            // Move the overflow data after the header.
309                            System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
310                            pos += oldPos - pageSize;
311                            current = next;
312    
313                        } else {
314                            throw new PageOverflowIOException("Page overflow.");
315                        }
316                    }
317    
318                }
319    
320                @SuppressWarnings("unchecked")
321                @Override
322                public void close() throws IOException {
323                    super.close();
324    
325                    // We need to free up the rest of the page chain..
326                    if (current.getType() == Page.PAGE_PART_TYPE) {
327                        free(current.getNext());
328                    }
329    
330                    current.makePageEnd(pos, getWriteTransactionId());
331    
332                    // Write the header..
333                    pos = 0;
334                    current.write(this);
335    
336                    Transaction.this.write(current, buf);
337                }
338            };
339    
340            // The page header marshaled after the data is written.
341            out.skip(Page.PAGE_HEADER_SIZE);
342            return out;
343        }
344    
345        /**
346         * Loads a page from disk.
347         * 
348         * @param pageId 
349         *        the id of the page to load
350         * @param marshaller
351         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
352         * @return The page with the given id
353         * @throws IOException
354         *         If an disk error occurred.
355         * @throws IllegalStateException
356         *         if the PageFile is not loaded
357         */
358        public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
359            pageFile.assertLoaded();
360            Page<T> page = new Page<T>(pageId);
361            load(page, marshaller);
362            return page;
363        }
364    
365        /**
366         * Loads a page from disk.
367         * 
368         * @param page - The pageId field must be properly set 
369         * @param marshaller
370         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
371         * @throws IOException
372         *         If an disk error occurred.
373         * @throws InvalidPageIOException
374         *         If the page is is not valid.      
375         * @throws IllegalStateException
376         *         if the PageFile is not loaded
377         */
378        @SuppressWarnings("unchecked")
379        public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
380            pageFile.assertLoaded();
381    
382            // Can't load invalid offsets...
383            long pageId = page.getPageId();
384            if (pageId < 0) {
385                throw new InvalidPageIOException("Page id is not valid", pageId);
386            }
387    
388            // It might be a page this transaction has modified...
389            PageWrite update = writes.get(pageId);
390            if (update != null) {
391                page.copy(update.getPage());
392                return;
393            }
394    
395            // We may be able to get it from the cache...
396            Page<T> t = pageFile.getFromCache(pageId);
397            if (t != null) {
398                page.copy(t);
399                return;
400            }
401    
402            if (marshaller != null) {
403                // Full page read..
404                InputStream is = openInputStream(page);
405                DataInputStream dataIn = new DataInputStream(is);
406                page.set(marshaller.readPayload(dataIn));
407                is.close();
408            } else {
409                // Page header read.
410                DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
411                pageFile.readPage(pageId, in.getRawData());
412                page.read(in);
413                page.set(null);
414            }
415    
416            // Cache it.
417            if (marshaller != null) {
418                pageFile.addToCache(page);
419            }
420        }
421    
422        /**
423         * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
424         *      org.apache.kahadb.util.Marshaller)
425         */
426        public InputStream openInputStream(final Page p) throws IOException {
427    
428            return new InputStream() {
429    
430                private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
431                private Page page = readPage(p);
432                private int pageCount = 1;
433    
434                private Page markPage;
435                private ByteSequence markChunk;
436    
437                private Page readPage(Page page) throws IOException {
438                    // Read the page data
439                    
440                    pageFile.readPage(page.getPageId(), chunk.getData());
441                    
442                    chunk.setOffset(0);
443                    chunk.setLength(pageFile.getPageSize());
444    
445                    DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
446                    page.read(in);
447    
448                    chunk.setOffset(Page.PAGE_HEADER_SIZE);
449                    if (page.getType() == Page.PAGE_END_TYPE) {
450                        chunk.setLength((int)(page.getNext()));
451                    }
452    
453                    if (page.getType() == Page.PAGE_FREE_TYPE) {
454                        throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
455                    }
456    
457                    return page;
458                }
459    
460                public int read() throws IOException {
461                    if (!atEOF()) {
462                        return chunk.data[chunk.offset++] & 0xff;
463                    } else {
464                        return -1;
465                    }
466                }
467    
468                private boolean atEOF() throws IOException {
469                    if (chunk.offset < chunk.length) {
470                        return false;
471                    }
472                    if (page.getType() == Page.PAGE_END_TYPE) {
473                        return true;
474                    }
475                    fill();
476                    return chunk.offset >= chunk.length;
477                }
478    
479                private void fill() throws IOException {
480                    page = readPage(new Page(page.getNext()));
481                    pageCount++;
482                }
483    
484                public int read(byte[] b) throws IOException {
485                    return read(b, 0, b.length);
486                }
487    
488                public int read(byte b[], int off, int len) throws IOException {
489                    if (!atEOF()) {
490                        int rc = 0;
491                        while (!atEOF() && rc < len) {
492                            len = Math.min(len, chunk.length - chunk.offset);
493                            if (len > 0) {
494                                System.arraycopy(chunk.data, chunk.offset, b, off, len);
495                                chunk.offset += len;
496                            }
497                            rc += len;
498                        }
499                        return rc;
500                    } else {
501                        return -1;
502                    }
503                }
504    
505                public long skip(long len) throws IOException {
506                    if (atEOF()) {
507                        int rc = 0;
508                        while (!atEOF() && rc < len) {
509                            len = Math.min(len, chunk.length - chunk.offset);
510                            if (len > 0) {
511                                chunk.offset += len;
512                            }
513                            rc += len;
514                        }
515                        return rc;
516                    } else {
517                        return -1;
518                    }
519                }
520    
521                public int available() {
522                    return chunk.length - chunk.offset;
523                }
524    
525                public boolean markSupported() {
526                    return true;
527                }
528    
529                public void mark(int markpos) {
530                    markPage = page;
531                    byte data[] = new byte[pageFile.getPageSize()];
532                    System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
533                    markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
534                }
535    
536                public void reset() {
537                    page = markPage;
538                    chunk = markChunk;
539                }
540    
541            };
542        }
543    
544        /**
545         * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are 
546         * not included in this iteration. 
547         * 
548         * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
549         * 
550         * @throws IllegalStateException
551         *         if the PageFile is not loaded
552         */
553        @SuppressWarnings("unchecked")
554        public Iterator<Page> iterator() {
555            return (Iterator<Page>)iterator(false);
556        }
557    
558        /**
559         * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
560         * iterated.
561         * 
562         * @param includeFreePages - if true, free pages are included in the iteration
563         * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
564         * @throws IllegalStateException
565         *         if the PageFile is not loaded
566         */
567        public Iterator<Page> iterator(final boolean includeFreePages) {
568    
569            pageFile.assertLoaded();
570    
571            return new Iterator<Page>() {
572                long nextId;
573                Page nextPage;
574                Page lastPage;
575    
576                private void findNextPage() {
577                    if (!pageFile.isLoaded()) {
578                        throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
579                    }
580    
581                    if (nextPage != null) {
582                        return;
583                    }
584    
585                    try {
586                        while (nextId < pageFile.getPageCount()) {
587    
588                            Page page = load(nextId, null);
589    
590                            if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
591                                nextPage = page;
592                                return;
593                            } else {
594                                nextId++;
595                            }
596                        }
597                    } catch (IOException e) {
598                    }
599                }
600    
601                public boolean hasNext() {
602                    findNextPage();
603                    return nextPage != null;
604                }
605    
606                public Page next() {
607                    findNextPage();
608                    if (nextPage != null) {
609                        lastPage = nextPage;
610                        nextPage = null;
611                        nextId++;
612                        return lastPage;
613                    } else {
614                        throw new NoSuchElementException();
615                    }
616                }
617    
618                @SuppressWarnings("unchecked")
619                public void remove() {
620                    if (lastPage == null) {
621                        throw new IllegalStateException();
622                    }
623                    try {
624                        free(lastPage);
625                        lastPage = null;
626                    } catch (IOException e) {
627                        new RuntimeException(e);
628                    }
629                }
630            };
631        }
632    
633        ///////////////////////////////////////////////////////////////////
634        // Commit / Rollback related methods..
635        ///////////////////////////////////////////////////////////////////
636        
637        /**
638         * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
639         * with the transaction are written to disk or none will.
640         */
641        public void commit() throws IOException {
642            if( writeTransactionId!=-1 ) {
643                // Actually do the page writes...
644                pageFile.write(writes.entrySet());
645                // Release the pages that were freed up in the transaction..
646                freePages(freeList);
647                
648                freeList.clear();
649                allocateList.clear();
650                writes.clear();
651                writeTransactionId = -1;
652            }
653        }
654    
655        /**
656         * Rolls back the transaction.
657         */
658        public void rollback() throws IOException {
659            if( writeTransactionId!=-1 ) {
660                // Release the pages that were allocated in the transaction...
661                freePages(allocateList);
662    
663                freeList.clear();
664                allocateList.clear();
665                writes.clear();
666                writeTransactionId = -1;
667            }
668        }
669    
670        private long getWriteTransactionId() {
671            if( writeTransactionId==-1 ) {
672                writeTransactionId = pageFile.getNextWriteTransactionId();
673            }
674            return writeTransactionId;
675        }
676    
677        /**
678         * Queues up a page write that should get done when commit() gets called.
679         */
680        @SuppressWarnings("unchecked")
681        private void write(final Page page, byte[] data) throws IOException {
682            Long key = page.getPageId();
683            // TODO: if a large update transaction is in progress, we may want to move
684            // all the current updates to a temp file so that we don't keep using 
685            // up memory.
686            writes.put(key, new PageWrite(page, data));        
687        }   
688    
689        /**
690         * @param list
691         * @throws RuntimeException
692         */
693        private void freePages(SequenceSet list) throws RuntimeException {
694            Sequence seq = list.getHead();
695            while( seq!=null ) {
696                seq.each(new Sequence.Closure<RuntimeException>(){
697                    public void execute(long value) {
698                        pageFile.freePage(value);
699                    }
700                });
701                seq = seq.getNext();
702            }
703        }
704        
705        /**
706         * @return true if there are no uncommitted page file updates associated with this transaction.
707         */
708        public boolean isReadOnly() {
709            return writeTransactionId==-1;
710        }
711        
712        ///////////////////////////////////////////////////////////////////
713        // Transaction closure helpers...
714        ///////////////////////////////////////////////////////////////////
715        
716        /**
717         * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
718         * If the closure throws an Exception, then the transaction is rolled back.
719         * 
720         * @param <T>
721         * @param closure - the work to get exectued.
722         * @throws T if the closure throws it
723         * @throws IOException If the commit fails.
724         */
725        public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
726            boolean success = false;
727            try {
728                closure.execute(this);
729                success = true;
730            } finally {
731                if (success) {
732                    commit();
733                } else {
734                    rollback();
735                }
736            }
737        }
738    
739        /**
740         * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
741         * If the closure throws an Exception, then the transaction is rolled back.
742         * 
743         * @param <T>
744         * @param closure - the work to get exectued.
745         * @throws T if the closure throws it
746         * @throws IOException If the commit fails.
747         */
748        public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
749            boolean success = false;
750            try {
751                R rc = closure.execute(this);
752                success = true;
753                return rc;
754            } finally {
755                if (success) {
756                    commit();
757                } else {
758                    rollback();
759                }
760            }
761        }
762    
763    }