001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.kahadb.plist; 018 019 import java.io.DataInput; 020 import java.io.DataOutput; 021 import java.io.File; 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.Map.Entry; 031 import org.apache.activemq.util.IOHelper; 032 import org.apache.activemq.util.ServiceStopper; 033 import org.apache.activemq.util.ServiceSupport; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 import org.apache.kahadb.index.BTreeIndex; 037 import org.apache.kahadb.journal.Journal; 038 import org.apache.kahadb.journal.Location; 039 import org.apache.kahadb.page.Page; 040 import org.apache.kahadb.page.PageFile; 041 import org.apache.kahadb.page.Transaction; 042 import org.apache.kahadb.util.ByteSequence; 043 import org.apache.kahadb.util.IntegerMarshaller; 044 import org.apache.kahadb.util.LockFile; 045 import org.apache.kahadb.util.StringMarshaller; 046 import org.apache.kahadb.util.VariableMarshaller; 047 048 /** 049 * @org.apache.xbean.XBean 050 */ 051 public class PListStore extends ServiceSupport { 052 static final Logger LOG = LoggerFactory.getLogger(PListStore.class); 053 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 054 055 static final int CLOSED_STATE = 1; 056 static final int OPEN_STATE = 2; 057 058 private File directory; 059 PageFile pageFile; 060 private Journal journal; 061 private LockFile lockFile; 062 private boolean failIfDatabaseIsLocked; 063 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 064 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 065 private boolean enableIndexWriteAsync = false; 066 private boolean initialized = false; 067 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 068 MetaData metaData = new MetaData(this); 069 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 070 Map<String, PList> persistentLists = new HashMap<String, PList>(); 071 final Object indexLock = new Object(); 072 073 public Object getIndexLock() { 074 return indexLock; 075 } 076 077 protected class MetaData { 078 protected MetaData(PListStore store) { 079 this.store = store; 080 } 081 082 private final PListStore store; 083 Page<MetaData> page; 084 BTreeIndex<Integer, Integer> journalRC; 085 BTreeIndex<String, PList> storedSchedulers; 086 087 void createIndexes(Transaction tx) throws IOException { 088 this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId()); 089 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId()); 090 } 091 092 void load(Transaction tx) throws IOException { 093 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 094 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 095 this.storedSchedulers.load(tx); 096 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 097 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 098 this.journalRC.load(tx); 099 } 100 101 void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException { 102 for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) { 103 Entry<String, PList> entry = i.next(); 104 entry.getValue().load(tx); 105 schedulers.put(entry.getKey(), entry.getValue()); 106 } 107 } 108 109 public void read(DataInput is) throws IOException { 110 this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong()); 111 this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE); 112 this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store)); 113 this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong()); 114 this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE); 115 this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE); 116 } 117 118 public void write(DataOutput os) throws IOException { 119 os.writeLong(this.storedSchedulers.getPageId()); 120 os.writeLong(this.journalRC.getPageId()); 121 122 } 123 } 124 125 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 126 private final PListStore store; 127 128 MetaDataMarshaller(PListStore store) { 129 this.store = store; 130 } 131 public MetaData readPayload(DataInput dataIn) throws IOException { 132 MetaData rc = new MetaData(this.store); 133 rc.read(dataIn); 134 return rc; 135 } 136 137 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 138 object.write(dataOut); 139 } 140 } 141 142 class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> { 143 public List<EntryLocation> readPayload(DataInput dataIn) throws IOException { 144 List<EntryLocation> result = new ArrayList<EntryLocation>(); 145 int size = dataIn.readInt(); 146 for (int i = 0; i < size; i++) { 147 EntryLocation jobLocation = new EntryLocation(); 148 jobLocation.readExternal(dataIn); 149 result.add(jobLocation); 150 } 151 return result; 152 } 153 154 public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException { 155 dataOut.writeInt(value.size()); 156 for (EntryLocation jobLocation : value) { 157 jobLocation.writeExternal(dataOut); 158 } 159 } 160 } 161 162 class JobSchedulerMarshaller extends VariableMarshaller<PList> { 163 private final PListStore store; 164 JobSchedulerMarshaller(PListStore store) { 165 this.store = store; 166 } 167 public PList readPayload(DataInput dataIn) throws IOException { 168 PList result = new PList(this.store); 169 result.read(dataIn); 170 return result; 171 } 172 173 public void writePayload(PList js, DataOutput dataOut) throws IOException { 174 js.write(dataOut); 175 } 176 } 177 178 public File getDirectory() { 179 return directory; 180 } 181 182 public void setDirectory(File directory) { 183 this.directory = directory; 184 } 185 186 public long size() { 187 synchronized (this) { 188 if (!initialized) { 189 return 0; 190 } 191 } 192 try { 193 return journal.getDiskSize() + pageFile.getDiskSize(); 194 } catch (IOException e) { 195 throw new RuntimeException(e); 196 } 197 } 198 199 synchronized public PList getPList(final String name) throws Exception { 200 if (!isStarted()) { 201 throw new IllegalStateException("Not started"); 202 } 203 intialize(); 204 PList result = this.persistentLists.get(name); 205 if (result == null) { 206 final PList pl = new PList(this); 207 pl.setName(name); 208 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 209 public void execute(Transaction tx) throws IOException { 210 pl.setRootId(tx.allocate().getPageId()); 211 pl.load(tx); 212 metaData.storedSchedulers.put(tx, name, pl); 213 } 214 }); 215 result = pl; 216 this.persistentLists.put(name, pl); 217 } 218 final PList load = result; 219 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 220 public void execute(Transaction tx) throws IOException { 221 load.load(tx); 222 } 223 }); 224 225 return result; 226 } 227 228 synchronized public boolean removePList(final String name) throws Exception { 229 boolean result = false; 230 final PList pl = this.persistentLists.remove(name); 231 result = pl != null; 232 if (result) { 233 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 234 public void execute(Transaction tx) throws IOException { 235 metaData.storedSchedulers.remove(tx, name); 236 pl.destroy(tx); 237 } 238 }); 239 } 240 return result; 241 } 242 243 protected synchronized void intialize() throws Exception { 244 if (isStarted()) { 245 if (this.initialized == false) { 246 this.initialized = true; 247 if (this.directory == null) { 248 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 249 } 250 IOHelper.mkdirs(this.directory); 251 lock(); 252 this.journal = new Journal(); 253 this.journal.setDirectory(directory); 254 this.journal.setMaxFileLength(getJournalMaxFileLength()); 255 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 256 this.journal.start(); 257 this.pageFile = new PageFile(directory, "tmpDB"); 258 this.pageFile.load(); 259 260 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 261 public void execute(Transaction tx) throws IOException { 262 if (pageFile.getPageCount() == 0) { 263 Page<MetaData> page = tx.allocate(); 264 assert page.getPageId() == 0; 265 page.set(metaData); 266 metaData.page = page; 267 metaData.createIndexes(tx); 268 tx.store(metaData.page, metaDataMarshaller, true); 269 270 } else { 271 Page<MetaData> page = tx.load(0, metaDataMarshaller); 272 metaData = page.get(); 273 metaData.page = page; 274 } 275 metaData.load(tx); 276 metaData.loadLists(tx, persistentLists); 277 } 278 }); 279 280 this.pageFile.flush(); 281 LOG.info(this + " initialized"); 282 } 283 } 284 } 285 286 @Override 287 protected synchronized void doStart() throws Exception { 288 LOG.info(this + " started"); 289 } 290 291 @Override 292 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 293 for (PList pl : this.persistentLists.values()) { 294 pl.unload(); 295 } 296 if (this.pageFile != null) { 297 this.pageFile.unload(); 298 } 299 if (this.journal != null) { 300 journal.close(); 301 } 302 if (this.lockFile != null) { 303 this.lockFile.unlock(); 304 } 305 this.lockFile = null; 306 this.initialized = false; 307 LOG.info(this + " stopped"); 308 309 } 310 311 synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException { 312 int logId = location.getDataFileId(); 313 Integer val = this.metaData.journalRC.get(tx, logId); 314 int refCount = val != null ? val.intValue() + 1 : 1; 315 this.metaData.journalRC.put(tx, logId, refCount); 316 317 } 318 319 synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException { 320 int logId = location.getDataFileId(); 321 if (logId != Location.NOT_SET) { 322 int refCount = this.metaData.journalRC.get(tx, logId); 323 refCount--; 324 if (refCount <= 0) { 325 this.metaData.journalRC.remove(tx, logId); 326 Set<Integer> set = new HashSet<Integer>(); 327 set.add(logId); 328 this.journal.removeDataFiles(set); 329 } else { 330 this.metaData.journalRC.put(tx, logId, refCount); 331 } 332 } 333 } 334 335 synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 336 ByteSequence result = null; 337 result = this.journal.read(location); 338 return result; 339 } 340 341 synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 342 return this.journal.write(payload, sync); 343 } 344 345 private void lock() throws IOException { 346 if (lockFile == null) { 347 File lockFileName = new File(directory, "lock"); 348 lockFile = new LockFile(lockFileName, true); 349 if (failIfDatabaseIsLocked) { 350 lockFile.lock(); 351 } else { 352 while (true) { 353 try { 354 lockFile.lock(); 355 break; 356 } catch (IOException e) { 357 LOG.info("Database " + lockFileName + " is locked... waiting " 358 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 359 + " seconds for the database to be unlocked. Reason: " + e); 360 try { 361 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 362 } catch (InterruptedException e1) { 363 } 364 } 365 } 366 } 367 } 368 } 369 370 PageFile getPageFile() { 371 this.pageFile.isLoaded(); 372 return this.pageFile; 373 } 374 375 public boolean isFailIfDatabaseIsLocked() { 376 return failIfDatabaseIsLocked; 377 } 378 379 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 380 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 381 } 382 383 public int getJournalMaxFileLength() { 384 return journalMaxFileLength; 385 } 386 387 public void setJournalMaxFileLength(int journalMaxFileLength) { 388 this.journalMaxFileLength = journalMaxFileLength; 389 } 390 391 public int getJournalMaxWriteBatchSize() { 392 return journalMaxWriteBatchSize; 393 } 394 395 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 396 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 397 } 398 399 public boolean isEnableIndexWriteAsync() { 400 return enableIndexWriteAsync; 401 } 402 403 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 404 this.enableIndexWriteAsync = enableIndexWriteAsync; 405 } 406 407 @Override 408 public String toString() { 409 return "PListStore:" + this.directory; 410 } 411 412 }