001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.SQLException;
021    import java.util.concurrent.atomic.AtomicLong;
022    
023    import org.apache.activemq.ActiveMQMessageAudit;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.store.AbstractMessageStore;
030    import org.apache.activemq.store.MessageRecoveryListener;
031    import org.apache.activemq.util.ByteSequence;
032    import org.apache.activemq.util.ByteSequenceData;
033    import org.apache.activemq.util.IOExceptionSupport;
034    import org.apache.activemq.wireformat.WireFormat;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * 
040     */
041    public class JDBCMessageStore extends AbstractMessageStore {
042    
043        class Duration {
044            static final int LIMIT = 100;
045            final long start = System.currentTimeMillis();
046            final String name;
047    
048            Duration(String name) {
049                this.name = name;
050            }
051            void end() {
052                end(null);
053            }
054            void end(Object o) {
055                long duration = System.currentTimeMillis() - start;
056    
057                if (duration > LIMIT) {
058                    System.err.println(name + " took a long time: " + duration + "ms " + o);
059                }
060            }
061        }
062        private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
063        protected final WireFormat wireFormat;
064        protected final JDBCAdapter adapter;
065        protected final JDBCPersistenceAdapter persistenceAdapter;
066        protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
067        protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
068    
069        protected ActiveMQMessageAudit audit;
070        
071        public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
072            super(destination);
073            this.persistenceAdapter = persistenceAdapter;
074            this.adapter = adapter;
075            this.wireFormat = wireFormat;
076            this.audit = audit;
077        }
078        
079        public void addMessage(ConnectionContext context, Message message) throws IOException {
080            MessageId messageId = message.getMessageId();
081            if (audit != null && audit.isDuplicate(message)) {
082                if (LOG.isDebugEnabled()) {
083                    LOG.debug(destination.getPhysicalName()
084                        + " ignoring duplicated (add) message, already stored: "
085                        + messageId);
086                }
087                return;
088            }
089            
090            long sequenceId = persistenceAdapter.getNextSequenceId();
091            
092            // Serialize the Message..
093            byte data[];
094            try {
095                ByteSequence packet = wireFormat.marshal(message);
096                data = ByteSequenceData.toByteArray(packet);
097            } catch (IOException e) {
098                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
099            }
100    
101            // Get a connection and insert the message into the DB.
102            TransactionContext c = persistenceAdapter.getTransactionContext(context);
103            try {      
104                adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(), message.getPriority());
105            } catch (SQLException e) {
106                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
107                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
108            } finally {
109                c.close();
110            }
111            onAdd(sequenceId, message.getPriority());
112        }
113    
114        protected void onAdd(long sequenceId, byte priority) {
115        }
116    
117        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
118            // Get a connection and insert the message into the DB.
119            TransactionContext c = persistenceAdapter.getTransactionContext(context);
120            try {
121                adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef);
122            } catch (SQLException e) {
123                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
124                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
125            } finally {
126                c.close();
127            }
128        }
129    
130        public Message getMessage(MessageId messageId) throws IOException {
131            // Get a connection and pull the message out of the DB
132            TransactionContext c = persistenceAdapter.getTransactionContext();
133            try {
134                byte data[] = adapter.doGetMessage(c, messageId);
135                if (data == null) {
136                    return null;
137                }
138    
139                Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
140                return answer;
141            } catch (IOException e) {
142                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
143            } catch (SQLException e) {
144                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
145                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
146            } finally {
147                c.close();
148            }
149        }
150    
151        public String getMessageReference(MessageId messageId) throws IOException {
152            long id = messageId.getBrokerSequenceId();
153    
154            // Get a connection and pull the message out of the DB
155            TransactionContext c = persistenceAdapter.getTransactionContext();
156            try {
157                return adapter.doGetMessageReference(c, id);
158            } catch (IOException e) {
159                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
160            } catch (SQLException e) {
161                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
162                throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
163            } finally {
164                c.close();
165            }
166        }
167    
168        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
169            
170            long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
171    
172            // Get a connection and remove the message from the DB
173            TransactionContext c = persistenceAdapter.getTransactionContext(context);
174            try {
175                adapter.doRemoveMessage(c, seq);
176            } catch (SQLException e) {
177                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
178                throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
179            } finally {
180                c.close();
181            }
182        }
183    
184        public void recover(final MessageRecoveryListener listener) throws Exception {
185    
186            // Get all the Message ids out of the database.
187            TransactionContext c = persistenceAdapter.getTransactionContext();
188            try {
189                c = persistenceAdapter.getTransactionContext();
190                adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
191                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
192                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
193                        msg.getMessageId().setBrokerSequenceId(sequenceId);
194                        return listener.recoverMessage(msg);
195                    }
196    
197                    public boolean recoverMessageReference(String reference) throws Exception {
198                        return listener.recoverMessageReference(new MessageId(reference));
199                    }
200                });
201            } catch (SQLException e) {
202                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
203                throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
204            } finally {
205                c.close();
206            }
207        }
208    
209        /**
210         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
211         */
212        public void removeAllMessages(ConnectionContext context) throws IOException {
213            // Get a connection and remove the message from the DB
214            TransactionContext c = persistenceAdapter.getTransactionContext(context);
215            try {
216                adapter.doRemoveAllMessages(c, destination);
217            } catch (SQLException e) {
218                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
219                throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
220            } finally {
221                c.close();
222            }
223        }
224    
225        public int getMessageCount() throws IOException {
226            int result = 0;
227            TransactionContext c = persistenceAdapter.getTransactionContext();
228            try {
229    
230                result = adapter.doGetMessageCount(c, destination);
231    
232            } catch (SQLException e) {
233                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
234                throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
235            } finally {
236                c.close();
237            }
238            return result;
239        }
240    
241        /**
242         * @param maxReturned
243         * @param listener
244         * @throws Exception
245         * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
246         *      org.apache.activemq.store.MessageRecoveryListener)
247         */
248        public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
249            TransactionContext c = persistenceAdapter.getTransactionContext();
250            try {
251                adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
252                        maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
253    
254                    public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
255                        if (listener.hasSpace()) {
256                            Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
257                            msg.getMessageId().setBrokerSequenceId(sequenceId);
258                            listener.recoverMessage(msg);
259                            lastRecoveredSequenceId.set(sequenceId);
260                            lastRecoveredPriority.set(msg.getPriority());
261                            return true;
262                        }
263                        return false;
264                    }
265    
266                    public boolean recoverMessageReference(String reference) throws Exception {
267                        if (listener.hasSpace()) {
268                            listener.recoverMessageReference(new MessageId(reference));
269                            return true;
270                        }
271                        return false;
272                    }
273    
274                });
275            } catch (SQLException e) {
276                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
277            } finally {
278                c.close();
279            }
280    
281        }
282    
283        /**
284         * @see org.apache.activemq.store.MessageStore#resetBatching()
285         */
286        public void resetBatching() {
287            if (LOG.isTraceEnabled()) {
288                LOG.trace(destination.getPhysicalName() + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
289            }
290            lastRecoveredSequenceId.set(-1);
291            lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
292    
293        }
294    
295        @Override
296        public void setBatch(MessageId messageId) {
297            try {
298                long[] storedValues = getStoreSequenceIdForMessageId(messageId);
299                lastRecoveredSequenceId.set(storedValues[0]);
300                lastRecoveredPriority.set(storedValues[1]);
301            } catch (IOException ignoredAsAlreadyLogged) {
302                lastRecoveredSequenceId.set(-1);
303                lastRecoveredPriority.set(Byte.MAX_VALUE -1);
304            }
305            if (LOG.isTraceEnabled()) {
306                LOG.trace(destination.getPhysicalName() + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
307                        + ", priority: " + lastRecoveredPriority.get());
308            }
309        }
310    
311        private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
312            long[] result = new long[]{-1, Byte.MAX_VALUE -1};
313            TransactionContext c = persistenceAdapter.getTransactionContext();
314            try {
315                result = adapter.getStoreSequenceId(c, destination, messageId);
316            } catch (SQLException e) {
317                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
318                throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
319            } finally {
320                c.close();
321            }
322            return result;
323        }
324        
325        public void setPrioritizedMessages(boolean prioritizedMessages) {
326            super.setPrioritizedMessages(prioritizedMessages);
327        }   
328    }