001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import java.util.concurrent.atomic.AtomicReference;
024    import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
025    import org.apache.kahadb.journal.Location;
026    import org.apache.kahadb.page.Page;
027    import org.apache.kahadb.page.Transaction;
028    import org.apache.kahadb.util.ByteSequence;
029    
030    public class PList {
031        final PListStore store;
032        private String name;
033        private long rootId = EntryLocation.NOT_SET;
034        private long lastId = EntryLocation.NOT_SET;
035        private final AtomicBoolean loaded = new AtomicBoolean();
036        private int size = 0;
037        Object indexLock;
038    
039        PList(PListStore store) {
040            this.store = store;
041            this.indexLock = store.getIndexLock();
042        }
043    
044        public void setName(String name) {
045            this.name = name;
046        }
047    
048        /*
049         * (non-Javadoc)
050         * @see org.apache.activemq.beanstalk.JobScheduler#getName()
051         */
052        public String getName() {
053            return this.name;
054        }
055    
056        public synchronized int size() {
057            return this.size;
058        }
059    
060        public synchronized boolean isEmpty() {
061            return size == 0;
062        }
063    
064        /**
065         * @return the rootId
066         */
067        public long getRootId() {
068            return this.rootId;
069        }
070    
071        /**
072         * @param rootId
073         *            the rootId to set
074         */
075        public void setRootId(long rootId) {
076            this.rootId = rootId;
077        }
078    
079        /**
080         * @return the lastId
081         */
082        public long getLastId() {
083            return this.lastId;
084        }
085    
086        /**
087         * @param lastId
088         *            the lastId to set
089         */
090        public void setLastId(long lastId) {
091            this.lastId = lastId;
092        }
093    
094        /**
095         * @return the loaded
096         */
097        public boolean isLoaded() {
098            return this.loaded.get();
099        }
100    
101        void read(DataInput in) throws IOException {
102            this.rootId = in.readLong();
103            this.name = in.readUTF();
104        }
105    
106        public void write(DataOutput out) throws IOException {
107            out.writeLong(this.rootId);
108            out.writeUTF(name);
109        }
110    
111        public synchronized void destroy() throws IOException {
112            synchronized (indexLock) {
113                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
114                    public void execute(Transaction tx) throws IOException {
115                        destroy(tx);
116                    }
117                });
118            }
119        }
120    
121        void destroy(Transaction tx) throws IOException {
122            // start from the first
123            EntryLocation entry = getFirst(tx);
124            while (entry != null) {
125                EntryLocation toRemove = entry.copy();
126                entry = getNext(tx, entry.getNext());
127                doRemove(tx, toRemove);
128            }
129        }
130    
131        synchronized void load(Transaction tx) throws IOException {
132            if (loaded.compareAndSet(false, true)) {
133                final Page<EntryLocation> p = tx.load(this.rootId, null);
134                if (p.getType() == Page.PAGE_FREE_TYPE) {
135                    // Need to initialize it..
136                    EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
137    
138                    storeEntry(tx, root);
139                    this.lastId = root.getPage().getPageId();
140                } else {
141                    // find last id
142                    long nextId = this.rootId;
143                    while (nextId != EntryLocation.NOT_SET) {
144                        EntryLocation next = getNext(tx, nextId);
145                        if (next != null) {
146                            this.lastId = next.getPage().getPageId();
147                            nextId = next.getNext();
148                            this.size++;
149                        }
150                    }
151                }
152            }
153        }
154    
155        synchronized public void unload() {
156            if (loaded.compareAndSet(true, false)) {
157                this.rootId = EntryLocation.NOT_SET;
158                this.lastId = EntryLocation.NOT_SET;
159                this.size=0;
160            }
161        }
162    
163        synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
164            final Location location = this.store.write(bs, false);
165            synchronized (indexLock) {
166                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
167                    public void execute(Transaction tx) throws IOException {
168                        addLast(tx, id, bs, location);
169                    }
170                });
171            }
172        }
173    
174        private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
175            EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
176            entry.setLocation(location);
177            storeEntry(tx, entry);
178            this.store.incrementJournalCount(tx, location);
179    
180            EntryLocation last = loadEntry(tx, this.lastId);
181            last.setNext(entry.getPage().getPageId());
182            storeEntry(tx, last);
183            this.lastId = entry.getPage().getPageId();
184            this.size++;
185        }
186    
187        synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
188            final Location location = this.store.write(bs, false);
189            synchronized (indexLock) {
190                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
191                    public void execute(Transaction tx) throws IOException {
192                        addFirst(tx, id, bs, location);
193                    }
194                });
195            }
196        }
197    
198        private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
199            EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
200            entry.setLocation(location);
201            EntryLocation oldFirst = getFirst(tx);
202            if (oldFirst != null) {
203                oldFirst.setPrev(entry.getPage().getPageId());
204                storeEntry(tx, oldFirst);
205                entry.setNext(oldFirst.getPage().getPageId());
206    
207            }
208            EntryLocation root = getRoot(tx);
209            root.setNext(entry.getPage().getPageId());
210            storeEntry(tx, root);
211            storeEntry(tx, entry);
212    
213            this.store.incrementJournalCount(tx, location);
214            this.size++;
215        }
216    
217        synchronized public boolean remove(final String id) throws IOException {
218            final AtomicBoolean result = new AtomicBoolean();
219            synchronized (indexLock) {
220                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
221                    public void execute(Transaction tx) throws IOException {
222                        result.set(remove(tx, id));
223                    }
224                });
225            }
226            return result.get();
227        }
228    
229        synchronized public boolean remove(final int position) throws IOException {
230            final AtomicBoolean result = new AtomicBoolean();
231            synchronized (indexLock) {
232                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
233                    public void execute(Transaction tx) throws IOException {
234                        result.set(remove(tx, position));
235                    }
236                });
237            }
238            return result.get();
239        }
240    
241        synchronized public boolean remove(final PListEntry entry) throws IOException {
242            final AtomicBoolean result = new AtomicBoolean();
243            synchronized (indexLock) {
244                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
245                    public void execute(Transaction tx) throws IOException {
246                        result.set(doRemove(tx, entry.getEntry()));
247                    }
248                });
249            }
250            return result.get();
251        }
252    
253        synchronized public PListEntry get(final int position) throws IOException {
254            PListEntry result = null;
255            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
256            synchronized (indexLock) {
257                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
258                    public void execute(Transaction tx) throws IOException {
259                        ref.set(get(tx, position));
260                    }
261                });
262            }
263            if (ref.get() != null) {
264                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
265                result = new PListEntry(ref.get(), bs);
266            }
267            return result;
268        }
269    
270        synchronized public PListEntry getFirst() throws IOException {
271            PListEntry result = null;
272            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
273            synchronized (indexLock) {
274                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
275                    public void execute(Transaction tx) throws IOException {
276                        ref.set(getFirst(tx));
277                    }
278                });
279                if (ref.get() != null) {
280                    ByteSequence bs = this.store.getPayload(ref.get().getLocation());
281                    result = new PListEntry(ref.get(), bs);
282                }
283            }
284            return result;
285        }
286    
287        synchronized public PListEntry getLast() throws IOException {
288            PListEntry result = null;
289            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
290            synchronized (indexLock) {
291                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
292                    public void execute(Transaction tx) throws IOException {
293                        ref.set(getLast(tx));
294                    }
295                });
296                if (ref.get() != null) {
297                    ByteSequence bs = this.store.getPayload(ref.get().getLocation());
298                    result = new PListEntry(ref.get(), bs);
299                }
300            }
301            return result;
302        }
303    
304        synchronized public PListEntry getNext(PListEntry entry) throws IOException {
305            PListEntry result = null;
306            final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
307            if (nextId != EntryLocation.NOT_SET) {
308                final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
309                synchronized (indexLock) {
310                    this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
311                        public void execute(Transaction tx) throws IOException {
312                            ref.set(getNext(tx, nextId));
313                        }
314                    });
315                    if (ref.get() != null) {
316                        ByteSequence bs = this.store.getPayload(ref.get().getLocation());
317                        result = new PListEntry(ref.get(), bs);
318                    }
319                }
320            }
321            return result;
322        }
323    
324        synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
325            PListEntry result = null;
326            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
327            synchronized (indexLock) {
328                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
329                    public void execute(Transaction tx) throws IOException {
330                        ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
331                    }
332                });
333                if (ref.get() != null) {
334                    result = new PListEntry(ref.get(), entry.getByteSequence());
335                }
336            }
337            return result;
338        }
339    
340        boolean remove(Transaction tx, String id) throws IOException {
341            boolean result = false;
342            long nextId = this.rootId;
343            while (nextId != EntryLocation.NOT_SET) {
344                EntryLocation entry = getNext(tx, nextId);
345                if (entry != null) {
346                    if (entry.getId().equals(id)) {
347                        result = doRemove(tx, entry);
348                        break;
349                    }
350                    nextId = entry.getNext();
351                } else {
352                    // not found
353                    break;
354                }
355            }
356            return result;
357        }
358    
359        boolean remove(Transaction tx, int position) throws IOException {
360            boolean result = false;
361            long nextId = this.rootId;
362            int count = 0;
363            while (nextId != EntryLocation.NOT_SET) {
364                EntryLocation entry = getNext(tx, nextId);
365                if (entry != null) {
366                    if (count == position) {
367                        result = doRemove(tx, entry);
368                        break;
369                    }
370                    nextId = entry.getNext();
371                } else {
372                    // not found
373                    break;
374                }
375                count++;
376            }
377            return result;
378        }
379    
380        EntryLocation get(Transaction tx, int position) throws IOException {
381            EntryLocation result = null;
382            long nextId = this.rootId;
383            int count = -1;
384            while (nextId != EntryLocation.NOT_SET) {
385                EntryLocation entry = getNext(tx, nextId);
386                if (entry != null) {
387                    if (count == position) {
388                        result = entry;
389                        break;
390                    }
391                    nextId = entry.getNext();
392                } else {
393                    break;
394                }
395                count++;
396            }
397            return result;
398        }
399    
400        EntryLocation getFirst(Transaction tx) throws IOException {
401            long offset = getRoot(tx).getNext();
402            if (offset != EntryLocation.NOT_SET) {
403                return loadEntry(tx, offset);
404            }
405            return null;
406        }
407    
408        EntryLocation getLast(Transaction tx) throws IOException {
409            if (this.lastId != EntryLocation.NOT_SET) {
410                return loadEntry(tx, this.lastId);
411            }
412            return null;
413        }
414    
415        private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
416            boolean result = false;
417            if (entry != null) {
418    
419                EntryLocation prev = getPrevious(tx, entry.getPrev());
420                EntryLocation next = getNext(tx, entry.getNext());
421                long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
422                long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
423    
424                if (next != null) {
425                    next.setPrev(prevId);
426                    storeEntry(tx, next);
427                } else {
428                    // we are deleting the last one in the list
429                    this.lastId = prevId;
430                }
431                if (prev != null) {
432                    prev.setNext(nextId);
433                    storeEntry(tx, prev);
434                }
435    
436                this.store.decrementJournalCount(tx, entry.getLocation());
437                entry.reset();
438                storeEntry(tx, entry);
439                tx.free(entry.getPage().getPageId());
440                result = true;
441                this.size--;
442            }
443            return result;
444        }
445    
446        private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
447            Page<EntryLocation> p = tx.allocate();
448            EntryLocation result = new EntryLocation();
449            result.setPage(p);
450            p.set(result);
451            result.setId(id);
452            result.setPrev(previous);
453            result.setNext(next);
454            return result;
455        }
456    
457        private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
458            EntryLocation result = new EntryLocation();
459            result.setPage(p);
460            p.set(result);
461            result.setId(id);
462            result.setPrev(previous);
463            result.setNext(next);
464            return result;
465        }
466    
467        EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
468            Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
469            EntryLocation entry = page.get();
470            if (entry != null) {
471                entry.setPage(page);
472            }
473            return entry;
474        }
475        
476        private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
477            tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
478        }
479    
480        EntryLocation getNext(Transaction tx, long next) throws IOException {
481            EntryLocation result = null;
482            if (next != EntryLocation.NOT_SET) {
483                result = loadEntry(tx, next);
484            }
485            return result;
486        }
487    
488        private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
489            EntryLocation result = null;
490            if (previous != EntryLocation.NOT_SET) {
491                result = loadEntry(tx, previous);
492            }
493            return result;
494        }
495    
496        private EntryLocation getRoot(Transaction tx) throws IOException {
497            EntryLocation result = loadEntry(tx, this.rootId);
498            return result;
499        }
500    
501        ByteSequence getPayload(EntryLocation entry) throws IOException {
502            return this.store.getPayload(entry.getLocation());
503        }
504    }