001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.Future;
024    import javax.transaction.xa.XAException;
025    import org.apache.activemq.broker.ConnectionContext;
026    import org.apache.activemq.command.Message;
027    import org.apache.activemq.command.MessageAck;
028    import org.apache.activemq.command.MessageId;
029    import org.apache.activemq.command.TransactionId;
030    import org.apache.activemq.command.XATransactionId;
031    import org.apache.activemq.store.AbstractMessageStore;
032    import org.apache.activemq.store.MessageStore;
033    import org.apache.activemq.store.PersistenceAdapter;
034    import org.apache.activemq.store.ProxyMessageStore;
035    import org.apache.activemq.store.ProxyTopicMessageStore;
036    import org.apache.activemq.store.TopicMessageStore;
037    import org.apache.activemq.store.TransactionRecoveryListener;
038    import org.apache.activemq.store.TransactionStore;
039    
040    /**
041     * Provides a TransactionStore implementation that can create transaction aware
042     * MessageStore objects from non transaction aware MessageStore objects.
043     * 
044     * 
045     */
046    public class MemoryTransactionStore implements TransactionStore {
047    
048        ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049        ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050        final PersistenceAdapter persistenceAdapter;
051    
052        private boolean doingRecover;
053    
054        public class Tx {
055            private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056    
057            private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058    
059            public void add(AddMessageCommand msg) {
060                messages.add(msg);
061            }
062    
063            public void add(RemoveMessageCommand ack) {
064                acks.add(ack);
065            }
066    
067            public Message[] getMessages() {
068                Message rc[] = new Message[messages.size()];
069                int count = 0;
070                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071                    AddMessageCommand cmd = iter.next();
072                    rc[count++] = cmd.getMessage();
073                }
074                return rc;
075            }
076    
077            public MessageAck[] getAcks() {
078                MessageAck rc[] = new MessageAck[acks.size()];
079                int count = 0;
080                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081                    RemoveMessageCommand cmd = iter.next();
082                    rc[count++] = cmd.getMessageAck();
083                }
084                return rc;
085            }
086    
087            /**
088             * @throws IOException
089             */
090            public void commit() throws IOException {
091                ConnectionContext ctx = new ConnectionContext();
092                persistenceAdapter.beginTransaction(ctx);
093                try {
094                    
095                    // Do all the message adds.
096                    for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097                        AddMessageCommand cmd = iter.next();
098                        cmd.run(ctx);
099                    }
100                    // And removes..
101                    for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102                        RemoveMessageCommand cmd = iter.next();
103                        cmd.run(ctx);
104                    }
105                    
106                } catch ( IOException e ) {
107                    persistenceAdapter.rollbackTransaction(ctx);
108                    throw e;
109                }
110                persistenceAdapter.commitTransaction(ctx);
111            }
112        }
113        
114        public interface AddMessageCommand {
115            Message getMessage();
116    
117            void run(ConnectionContext context) throws IOException;
118        }
119    
120        public interface RemoveMessageCommand {
121            MessageAck getMessageAck();
122    
123            void run(ConnectionContext context) throws IOException;
124        }
125        
126        public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
127            this.persistenceAdapter=persistenceAdapter;
128        }
129    
130        public MessageStore proxy(MessageStore messageStore) {
131            return new ProxyMessageStore(messageStore) {
132                @Override
133                public void addMessage(ConnectionContext context, final Message send) throws IOException {
134                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
135                }
136    
137                @Override
138                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
139                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
140                    return AbstractMessageStore.FUTURE;
141                 }
142                 
143                @Override
144                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
145                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
146                }
147                 
148                @Override
149                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
150                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
151                }
152            };
153        }
154    
155        public TopicMessageStore proxy(TopicMessageStore messageStore) {
156            return new ProxyTopicMessageStore(messageStore) {
157                @Override
158                public void addMessage(ConnectionContext context, final Message send) throws IOException {
159                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
160                }
161    
162                @Override
163                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
164                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
165                    return AbstractMessageStore.FUTURE;
166                 }
167    
168                @Override
169                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
170                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
171                }
172                
173                @Override
174                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
175                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
176                }
177    
178                @Override
179                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
180                                MessageId messageId, MessageAck ack) throws IOException {
181                    MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
182                            subscriptionName, messageId, ack);
183                }
184            };
185        }
186    
187        /**
188         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
189         */
190        public void prepare(TransactionId txid) {
191            Tx tx = inflightTransactions.remove(txid);
192            if (tx == null) {
193                return;
194            }
195            preparedTransactions.put(txid, tx);
196        }
197    
198        public Tx getTx(Object txid) {
199            Tx tx = inflightTransactions.get(txid);
200            if (tx == null) {
201                tx = new Tx();
202                inflightTransactions.put(txid, tx);
203            }
204            return tx;
205        }
206    
207        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
208            if (preCommit != null) {
209                preCommit.run();
210            }
211            Tx tx;
212            if (wasPrepared) {
213                tx = preparedTransactions.remove(txid);
214            } else {
215                tx = inflightTransactions.remove(txid);
216            }
217    
218            if (tx == null) {
219                if (postCommit != null) {
220                    postCommit.run();
221                }
222                return;
223            }
224            // ensure message order w.r.t to cursor and store for setBatch()
225            synchronized (this) {
226                tx.commit();
227                if (postCommit != null) {
228                    postCommit.run();
229                }
230            }
231        }
232    
233        /**
234         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
235         */
236        public void rollback(TransactionId txid) {
237            preparedTransactions.remove(txid);
238            inflightTransactions.remove(txid);
239        }
240    
241        public void start() throws Exception {
242        }
243    
244        public void stop() throws Exception {
245        }
246    
247        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
248            // All the inflight transactions get rolled back..
249            inflightTransactions.clear();
250            this.doingRecover = true;
251            try {
252                for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
253                    Object txid = iter.next();
254                    Tx tx = preparedTransactions.get(txid);
255                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
256                }
257            } finally {
258                this.doingRecover = false;
259            }
260        }
261    
262        /**
263         * @param message
264         * @throws IOException
265         */
266        void addMessage(final MessageStore destination, final Message message) throws IOException {
267    
268            if (doingRecover) {
269                return;
270            }
271    
272            if (message.getTransactionId() != null) {
273                Tx tx = getTx(message.getTransactionId());
274                tx.add(new AddMessageCommand() {
275                    public Message getMessage() {
276                        return message;
277                    }
278    
279                    public void run(ConnectionContext ctx) throws IOException {
280                        destination.addMessage(ctx, message);
281                    }
282    
283                });
284            } else {
285                destination.addMessage(null, message);
286            }
287        }
288        
289        /**
290         * @param ack
291         * @throws IOException
292         */
293        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
294            if (doingRecover) {
295                return;
296            }
297    
298            if (ack.isInTransaction()) {
299                Tx tx = getTx(ack.getTransactionId());
300                tx.add(new RemoveMessageCommand() {
301                    public MessageAck getMessageAck() {
302                        return ack;
303                    }
304    
305                    public void run(ConnectionContext ctx) throws IOException {
306                        destination.removeMessage(ctx, ack);
307                    }
308                });
309            } else {
310                destination.removeMessage(null, ack);
311            }
312        }
313    
314        final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
315                               final MessageId messageId, final MessageAck ack) throws IOException {
316            if (doingRecover) {
317                return;
318            }
319    
320            if (ack.isInTransaction()) {
321                Tx tx = getTx(ack.getTransactionId());
322                tx.add(new RemoveMessageCommand() {
323                    public MessageAck getMessageAck() {
324                        return ack;
325                    }
326    
327                    public void run(ConnectionContext ctx) throws IOException {
328                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
329                    }
330                });
331            } else {
332                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
333            }
334        }
335    
336    
337        public void delete() {
338            inflightTransactions.clear();
339            preparedTransactions.clear();
340            doingRecover = false;
341        }
342    
343    }