001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.store.journal; 018 019 import java.io.File; 020 import java.io.IOException; 021 022 import org.apache.activeio.journal.Journal; 023 import org.apache.activeio.journal.active.JournalImpl; 024 import org.apache.activeio.journal.active.JournalLockedException; 025 import org.apache.activemq.store.PersistenceAdapter; 026 import org.apache.activemq.store.PersistenceAdapterFactory; 027 import org.apache.activemq.store.jdbc.DataSourceSupport; 028 import org.apache.activemq.store.jdbc.JDBCAdapter; 029 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 030 import org.apache.activemq.store.jdbc.Statements; 031 import org.apache.activemq.thread.TaskRunnerFactory; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 /** 036 * Factory class that can create PersistenceAdapter objects. 037 * 038 * @org.apache.xbean.XBean 039 * 040 */ 041 public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory { 042 043 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 044 045 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class); 046 047 private int journalLogFileSize = 1024 * 1024 * 20; 048 private int journalLogFiles = 2; 049 private TaskRunnerFactory taskRunnerFactory; 050 private Journal journal; 051 private boolean useJournal = true; 052 private boolean useQuickJournal; 053 private File journalArchiveDirectory; 054 private boolean failIfJournalIsLocked; 055 private int journalThreadPriority = Thread.MAX_PRIORITY; 056 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); 057 private boolean useDedicatedTaskRunner; 058 059 public PersistenceAdapter createPersistenceAdapter() throws IOException { 060 jdbcPersistenceAdapter.setDataSource(getDataSource()); 061 062 if (!useJournal) { 063 return jdbcPersistenceAdapter; 064 } 065 return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); 066 067 } 068 069 public int getJournalLogFiles() { 070 return journalLogFiles; 071 } 072 073 /** 074 * Sets the number of journal log files to use 075 */ 076 public void setJournalLogFiles(int journalLogFiles) { 077 this.journalLogFiles = journalLogFiles; 078 } 079 080 public int getJournalLogFileSize() { 081 return journalLogFileSize; 082 } 083 084 /** 085 * Sets the size of the journal log files 086 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 087 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 088 */ 089 public void setJournalLogFileSize(int journalLogFileSize) { 090 this.journalLogFileSize = journalLogFileSize; 091 } 092 093 public JDBCPersistenceAdapter getJdbcAdapter() { 094 return jdbcPersistenceAdapter; 095 } 096 097 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) { 098 this.jdbcPersistenceAdapter = jdbcAdapter; 099 } 100 101 public boolean isUseJournal() { 102 return useJournal; 103 } 104 105 /** 106 * Enables or disables the use of the journal. The default is to use the 107 * journal 108 * 109 * @param useJournal 110 */ 111 public void setUseJournal(boolean useJournal) { 112 this.useJournal = useJournal; 113 } 114 115 public boolean isUseDedicatedTaskRunner() { 116 return useDedicatedTaskRunner; 117 } 118 119 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 120 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 121 } 122 123 public TaskRunnerFactory getTaskRunnerFactory() { 124 if (taskRunnerFactory == null) { 125 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, 126 true, 1000, isUseDedicatedTaskRunner()); 127 } 128 return taskRunnerFactory; 129 } 130 131 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 132 this.taskRunnerFactory = taskRunnerFactory; 133 } 134 135 public Journal getJournal() throws IOException { 136 if (journal == null) { 137 createJournal(); 138 } 139 return journal; 140 } 141 142 public void setJournal(Journal journal) { 143 this.journal = journal; 144 } 145 146 public File getJournalArchiveDirectory() { 147 if (journalArchiveDirectory == null && useQuickJournal) { 148 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal"); 149 } 150 return journalArchiveDirectory; 151 } 152 153 public void setJournalArchiveDirectory(File journalArchiveDirectory) { 154 this.journalArchiveDirectory = journalArchiveDirectory; 155 } 156 157 public boolean isUseQuickJournal() { 158 return useQuickJournal; 159 } 160 161 /** 162 * Enables or disables the use of quick journal, which keeps messages in the 163 * journal and just stores a reference to the messages in JDBC. Defaults to 164 * false so that messages actually reside long term in the JDBC database. 165 */ 166 public void setUseQuickJournal(boolean useQuickJournal) { 167 this.useQuickJournal = useQuickJournal; 168 } 169 170 public JDBCAdapter getAdapter() throws IOException { 171 return jdbcPersistenceAdapter.getAdapter(); 172 } 173 174 public void setAdapter(JDBCAdapter adapter) { 175 jdbcPersistenceAdapter.setAdapter(adapter); 176 } 177 178 public Statements getStatements() { 179 return jdbcPersistenceAdapter.getStatements(); 180 } 181 182 public void setStatements(Statements statements) { 183 jdbcPersistenceAdapter.setStatements(statements); 184 } 185 186 public boolean isUseDatabaseLock() { 187 return jdbcPersistenceAdapter.isUseDatabaseLock(); 188 } 189 190 /** 191 * Sets whether or not an exclusive database lock should be used to enable 192 * JDBC Master/Slave. Enabled by default. 193 */ 194 public void setUseDatabaseLock(boolean useDatabaseLock) { 195 jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock); 196 } 197 198 public boolean isCreateTablesOnStartup() { 199 return jdbcPersistenceAdapter.isCreateTablesOnStartup(); 200 } 201 202 /** 203 * Sets whether or not tables are created on startup 204 */ 205 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 206 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); 207 } 208 209 public int getJournalThreadPriority() { 210 return journalThreadPriority; 211 } 212 213 /** 214 * Sets the thread priority of the journal thread 215 */ 216 public void setJournalThreadPriority(int journalThreadPriority) { 217 this.journalThreadPriority = journalThreadPriority; 218 } 219 220 /** 221 * @throws IOException 222 */ 223 protected void createJournal() throws IOException { 224 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile(); 225 if (failIfJournalIsLocked) { 226 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 227 getJournalArchiveDirectory()); 228 } else { 229 while (true) { 230 try { 231 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 232 getJournalArchiveDirectory()); 233 break; 234 } catch (JournalLockedException e) { 235 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) 236 + " seconds for the journal to be unlocked."); 237 try { 238 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 239 } catch (InterruptedException e1) { 240 } 241 } 242 } 243 } 244 } 245 246 }