001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.state;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.Map;
023    import java.util.Vector;
024    import java.util.Map.Entry;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAResource;
029    
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.ConnectionId;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerId;
035    import org.apache.activemq.command.ConsumerInfo;
036    import org.apache.activemq.command.DestinationInfo;
037    import org.apache.activemq.command.ExceptionResponse;
038    import org.apache.activemq.command.IntegerResponse;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessageId;
041    import org.apache.activemq.command.MessagePull;
042    import org.apache.activemq.command.ProducerId;
043    import org.apache.activemq.command.ProducerInfo;
044    import org.apache.activemq.command.Response;
045    import org.apache.activemq.command.SessionId;
046    import org.apache.activemq.command.SessionInfo;
047    import org.apache.activemq.command.TransactionInfo;
048    import org.apache.activemq.transport.Transport;
049    import org.apache.activemq.util.IOExceptionSupport;
050    import org.slf4j.Logger;
051    import org.slf4j.LoggerFactory;
052    
053    /**
054     * Tracks the state of a connection so a newly established transport can be
055     * re-initialized to the state that was tracked.
056     * 
057     * 
058     */
059    public class ConnectionStateTracker extends CommandVisitorAdapter {
060        private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
061    
062        private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
063    
064        protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
065    
066        private boolean trackTransactions;
067        private boolean restoreSessions = true;
068        private boolean restoreConsumers = true;
069        private boolean restoreProducers = true;
070        private boolean restoreTransaction = true;
071        private boolean trackMessages = true;
072        private boolean trackTransactionProducers = true;
073        private int maxCacheSize = 128 * 1024;
074        private int currentCacheSize;
075        private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
076            protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
077                boolean result = currentCacheSize > maxCacheSize;
078                if (result) {
079                    if (eldest.getValue() instanceof Message) {
080                        currentCacheSize -= ((Message)eldest.getValue()).getSize();
081                    }
082                }
083                return result;
084            }
085        };
086        
087        private class RemoveTransactionAction implements ResponseHandler {
088            private final TransactionInfo info;
089    
090            public RemoveTransactionAction(TransactionInfo info) {
091                this.info = info;
092            }
093    
094            public void onResponse(Command response) {
095                ConnectionId connectionId = info.getConnectionId();
096                ConnectionState cs = connectionStates.get(connectionId);
097                cs.removeTransactionState(info.getTransactionId());
098            }
099        }
100        
101        private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
102    
103            public PrepareReadonlyTransactionAction(TransactionInfo info) {
104                super(info);
105            }
106    
107            public void onResponse(Command command) {
108                IntegerResponse response = (IntegerResponse) command;
109                if (XAResource.XA_RDONLY == response.getResult()) {
110                    // all done, no commit or rollback from TM
111                    super.onResponse(command);
112                }
113            }
114        }
115    
116        /**
117         * 
118         * 
119         * @param command
120         * @return null if the command is not state tracked.
121         * @throws IOException
122         */
123        public Tracked track(Command command) throws IOException {
124            try {
125                return (Tracked)command.visit(this);
126            } catch (IOException e) {
127                throw e;
128            } catch (Throwable e) {
129                throw IOExceptionSupport.create(e);
130            }
131        }
132        
133        public void trackBack(Command command) {
134            if (command != null) {
135                if (trackMessages && command.isMessage()) {
136                    Message message = (Message) command;
137                    if (message.getTransactionId()==null) {
138                        currentCacheSize = currentCacheSize +  message.getSize();
139                    }
140                } else if (command instanceof MessagePull) {
141                    // just needs to be a rough estimate of size, ~4 identifiers
142                    currentCacheSize += 400;
143                }
144            }
145        }
146    
147        public void restore(Transport transport) throws IOException {
148            // Restore the connections.
149            for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
150                ConnectionState connectionState = iter.next();
151                connectionState.getInfo().setFailoverReconnect(true);
152                if (LOG.isDebugEnabled()) {
153                    LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
154                }
155                transport.oneway(connectionState.getInfo());
156                restoreTempDestinations(transport, connectionState);
157    
158                if (restoreSessions) {
159                    restoreSessions(transport, connectionState);
160                }
161    
162                if (restoreTransaction) {
163                    restoreTransactions(transport, connectionState);
164                }
165            }
166            //now flush messages
167            for (Command msg:messageCache.values()) {
168                if (LOG.isDebugEnabled()) {
169                    LOG.debug("command: " + msg.getCommandId());
170                }
171                transport.oneway(msg);
172            }
173        }
174    
175        private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
176            Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
177            for (TransactionState transactionState : connectionState.getTransactionStates()) {
178                if (LOG.isDebugEnabled()) {
179                    LOG.debug("tx: " + transactionState.getId());
180                }
181                
182                // rollback any completed transactions - no way to know if commit got there
183                // or if reply went missing
184                //
185                if (!transactionState.getCommands().isEmpty()) {
186                    Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
187                    if (lastCommand instanceof TransactionInfo) {
188                        TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
189                        if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
190                            if (LOG.isDebugEnabled()) {
191                                LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
192                            }
193                            toRollback.add(transactionInfo);
194                            continue;
195                        }
196                    }
197                }
198                
199                // replay short lived producers that may have been involved in the transaction
200                for (ProducerState producerState : transactionState.getProducerStates().values()) {
201                    if (LOG.isDebugEnabled()) {
202                        LOG.debug("tx replay producer :" + producerState.getInfo());
203                    }
204                    transport.oneway(producerState.getInfo());
205                }
206                
207                for (Command command : transactionState.getCommands()) {
208                    if (LOG.isDebugEnabled()) {
209                        LOG.debug("tx replay: " + command);
210                    }
211                    transport.oneway(command);
212                }
213                
214                for (ProducerState producerState : transactionState.getProducerStates().values()) {
215                    if (LOG.isDebugEnabled()) {
216                        LOG.debug("tx remove replayed producer :" + producerState.getInfo());
217                    }
218                    transport.oneway(producerState.getInfo().createRemoveCommand());
219                }
220            }
221            
222            for (TransactionInfo command: toRollback) {
223                // respond to the outstanding commit
224                ExceptionResponse response = new ExceptionResponse();
225                response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
226                response.setCorrelationId(command.getCommandId());
227                transport.getTransportListener().onCommand(response);
228            }
229        }
230    
231        /**
232         * @param transport
233         * @param connectionState
234         * @throws IOException
235         */
236        protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
237            // Restore the connection's sessions
238            for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
239                SessionState sessionState = (SessionState)iter2.next();
240                if (LOG.isDebugEnabled()) {
241                    LOG.debug("session: " + sessionState.getInfo().getSessionId());
242                }
243                transport.oneway(sessionState.getInfo());
244    
245                if (restoreProducers) {
246                    restoreProducers(transport, sessionState);
247                }
248    
249                if (restoreConsumers) {
250                    restoreConsumers(transport, sessionState);
251                }
252            }
253        }
254    
255        /**
256         * @param transport
257         * @param sessionState
258         * @throws IOException
259         */
260        protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
261            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
262            final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
263            final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
264            for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
265                ConsumerInfo infoToSend = consumerState.getInfo();
266                if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
267                    infoToSend = consumerState.getInfo().copy();
268                    connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
269                    infoToSend.setPrefetchSize(0);
270                    if (LOG.isDebugEnabled()) {
271                        LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
272                    }
273                }
274                if (LOG.isDebugEnabled()) {
275                    LOG.debug("restore consumer: " + infoToSend.getConsumerId());
276                }
277                transport.oneway(infoToSend);
278            }
279        }
280    
281        /**
282         * @param transport
283         * @param sessionState
284         * @throws IOException
285         */
286        protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
287            // Restore the session's producers
288            for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
289                ProducerState producerState = (ProducerState)iter3.next();
290                if (LOG.isDebugEnabled()) {
291                    LOG.debug("producer: " + producerState.getInfo().getProducerId());
292                }
293                transport.oneway(producerState.getInfo());
294            }
295        }
296    
297        /**
298         * @param transport
299         * @param connectionState
300         * @throws IOException
301         */
302        protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
303            throws IOException {
304            // Restore the connection's temp destinations.
305            for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
306                transport.oneway((DestinationInfo)iter2.next());
307            }
308        }
309    
310        public Response processAddDestination(DestinationInfo info) {
311            if (info != null) {
312                ConnectionState cs = connectionStates.get(info.getConnectionId());
313                if (cs != null && info.getDestination().isTemporary()) {
314                    cs.addTempDestination(info);
315                }
316            }
317            return TRACKED_RESPONSE_MARKER;
318        }
319    
320        public Response processRemoveDestination(DestinationInfo info) {
321            if (info != null) {
322                ConnectionState cs = connectionStates.get(info.getConnectionId());
323                if (cs != null && info.getDestination().isTemporary()) {
324                    cs.removeTempDestination(info.getDestination());
325                }
326            }
327            return TRACKED_RESPONSE_MARKER;
328        }
329    
330        public Response processAddProducer(ProducerInfo info) {
331            if (info != null && info.getProducerId() != null) {
332                SessionId sessionId = info.getProducerId().getParentId();
333                if (sessionId != null) {
334                    ConnectionId connectionId = sessionId.getParentId();
335                    if (connectionId != null) {
336                        ConnectionState cs = connectionStates.get(connectionId);
337                        if (cs != null) {
338                            SessionState ss = cs.getSessionState(sessionId);
339                            if (ss != null) {
340                                ss.addProducer(info);
341                            }
342                        }
343                    }
344                }
345            }
346            return TRACKED_RESPONSE_MARKER;
347        }
348    
349        public Response processRemoveProducer(ProducerId id) {
350            if (id != null) {
351                SessionId sessionId = id.getParentId();
352                if (sessionId != null) {
353                    ConnectionId connectionId = sessionId.getParentId();
354                    if (connectionId != null) {
355                        ConnectionState cs = connectionStates.get(connectionId);
356                        if (cs != null) {
357                            SessionState ss = cs.getSessionState(sessionId);
358                            if (ss != null) {
359                                ss.removeProducer(id);
360                            }
361                        }
362                    }
363                }
364            }
365            return TRACKED_RESPONSE_MARKER;
366        }
367    
368        public Response processAddConsumer(ConsumerInfo info) {
369            if (info != null) {
370                SessionId sessionId = info.getConsumerId().getParentId();
371                if (sessionId != null) {
372                    ConnectionId connectionId = sessionId.getParentId();
373                    if (connectionId != null) {
374                        ConnectionState cs = connectionStates.get(connectionId);
375                        if (cs != null) {
376                            SessionState ss = cs.getSessionState(sessionId);
377                            if (ss != null) {
378                                ss.addConsumer(info);
379                            }
380                        }
381                    }
382                }
383            }
384            return TRACKED_RESPONSE_MARKER;
385        }
386    
387        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
388            if (id != null) {
389                SessionId sessionId = id.getParentId();
390                if (sessionId != null) {
391                    ConnectionId connectionId = sessionId.getParentId();
392                    if (connectionId != null) {
393                        ConnectionState cs = connectionStates.get(connectionId);
394                        if (cs != null) {
395                            SessionState ss = cs.getSessionState(sessionId);
396                            if (ss != null) {
397                                ss.removeConsumer(id);
398                            }
399                        }
400                    }
401                }
402            }
403            return TRACKED_RESPONSE_MARKER;
404        }
405    
406        public Response processAddSession(SessionInfo info) {
407            if (info != null) {
408                ConnectionId connectionId = info.getSessionId().getParentId();
409                if (connectionId != null) {
410                    ConnectionState cs = connectionStates.get(connectionId);
411                    if (cs != null) {
412                        cs.addSession(info);
413                    }
414                }
415            }
416            return TRACKED_RESPONSE_MARKER;
417        }
418    
419        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
420            if (id != null) {
421                ConnectionId connectionId = id.getParentId();
422                if (connectionId != null) {
423                    ConnectionState cs = connectionStates.get(connectionId);
424                    if (cs != null) {
425                        cs.removeSession(id);
426                    }
427                }
428            }
429            return TRACKED_RESPONSE_MARKER;
430        }
431    
432        public Response processAddConnection(ConnectionInfo info) {
433            if (info != null) {
434                connectionStates.put(info.getConnectionId(), new ConnectionState(info));
435            }
436            return TRACKED_RESPONSE_MARKER;
437        }
438    
439        public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
440            if (id != null) {
441                connectionStates.remove(id);
442            }
443            return TRACKED_RESPONSE_MARKER;
444        }
445    
446        public Response processMessage(Message send) throws Exception {
447            if (send != null) {
448                if (trackTransactions && send.getTransactionId() != null) {
449                    ProducerId producerId = send.getProducerId();
450                    ConnectionId connectionId = producerId.getParentId().getParentId();
451                    if (connectionId != null) {
452                        ConnectionState cs = connectionStates.get(connectionId);
453                        if (cs != null) {
454                            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
455                            if (transactionState != null) {
456                                transactionState.addCommand(send);
457                                
458                                if (trackTransactionProducers) {
459                                    // for jmstemplate, track the producer in case it is closed before commit
460                                    // and needs to be replayed
461                                    SessionState ss = cs.getSessionState(producerId.getParentId());
462                                    ProducerState producerState = ss.getProducerState(producerId);
463                                    producerState.setTransactionState(transactionState);            
464                                }
465                            }
466                        }
467                    }
468                    return TRACKED_RESPONSE_MARKER;
469                }else if (trackMessages) {
470                    messageCache.put(send.getMessageId(), send.copy());
471                }
472            }
473            return null;
474        }
475    
476        public Response processBeginTransaction(TransactionInfo info) {
477            if (trackTransactions && info != null && info.getTransactionId() != null) {
478                ConnectionId connectionId = info.getConnectionId();
479                if (connectionId != null) {
480                    ConnectionState cs = connectionStates.get(connectionId);
481                    if (cs != null) {
482                        cs.addTransactionState(info.getTransactionId());
483                        TransactionState state = cs.getTransactionState(info.getTransactionId());
484                        state.addCommand(info);
485                    }
486                }
487                return TRACKED_RESPONSE_MARKER;
488            }
489            return null;
490        }
491    
492        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
493            if (trackTransactions && info != null) {
494                ConnectionId connectionId = info.getConnectionId();
495                if (connectionId != null) {
496                    ConnectionState cs = connectionStates.get(connectionId);
497                    if (cs != null) {
498                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
499                        if (transactionState != null) {
500                            transactionState.addCommand(info);
501                            return new Tracked(new PrepareReadonlyTransactionAction(info));
502                        }
503                    }
504                }
505            }
506            return null;
507        }
508    
509        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
510            if (trackTransactions && info != null) {
511                ConnectionId connectionId = info.getConnectionId();
512                if (connectionId != null) {
513                    ConnectionState cs = connectionStates.get(connectionId);
514                    if (cs != null) {
515                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
516                        if (transactionState != null) {
517                            transactionState.addCommand(info);
518                            return new Tracked(new RemoveTransactionAction(info));
519                        }
520                    }
521                }
522            }
523            return null;
524        }
525    
526        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
527            if (trackTransactions && info != null) {
528                ConnectionId connectionId = info.getConnectionId();
529                if (connectionId != null) {
530                    ConnectionState cs = connectionStates.get(connectionId);
531                    if (cs != null) {
532                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
533                        if (transactionState != null) {
534                            transactionState.addCommand(info);
535                            return new Tracked(new RemoveTransactionAction(info));
536                        }
537                    }
538                }
539            }
540            return null;
541        }
542    
543        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
544            if (trackTransactions && info != null) {
545                ConnectionId connectionId = info.getConnectionId();
546                if (connectionId != null) {
547                    ConnectionState cs = connectionStates.get(connectionId);
548                    if (cs != null) {
549                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
550                        if (transactionState != null) {
551                            transactionState.addCommand(info);
552                            return new Tracked(new RemoveTransactionAction(info));
553                        }
554                    }
555                }
556            }
557            return null;
558        }
559    
560        public Response processEndTransaction(TransactionInfo info) throws Exception {
561            if (trackTransactions && info != null) {
562                ConnectionId connectionId = info.getConnectionId();
563                if (connectionId != null) {
564                    ConnectionState cs = connectionStates.get(connectionId);
565                    if (cs != null) {
566                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
567                        if (transactionState != null) {
568                            transactionState.addCommand(info);
569                        }
570                    }
571                }
572                return TRACKED_RESPONSE_MARKER;
573            }
574            return null;
575        }
576    
577        @Override
578        public Response processMessagePull(MessagePull pull) throws Exception {
579            if (pull != null) {
580                // leave a single instance in the cache
581                final String id = pull.getDestination() + "::" + pull.getConsumerId();
582                messageCache.put(id.intern(), pull);
583            }
584            return null;
585        }
586    
587        public boolean isRestoreConsumers() {
588            return restoreConsumers;
589        }
590    
591        public void setRestoreConsumers(boolean restoreConsumers) {
592            this.restoreConsumers = restoreConsumers;
593        }
594    
595        public boolean isRestoreProducers() {
596            return restoreProducers;
597        }
598    
599        public void setRestoreProducers(boolean restoreProducers) {
600            this.restoreProducers = restoreProducers;
601        }
602    
603        public boolean isRestoreSessions() {
604            return restoreSessions;
605        }
606    
607        public void setRestoreSessions(boolean restoreSessions) {
608            this.restoreSessions = restoreSessions;
609        }
610    
611        public boolean isTrackTransactions() {
612            return trackTransactions;
613        }
614    
615        public void setTrackTransactions(boolean trackTransactions) {
616            this.trackTransactions = trackTransactions;
617        }
618        
619        public boolean isTrackTransactionProducers() {
620            return this.trackTransactionProducers;
621        }
622    
623        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
624            this.trackTransactionProducers = trackTransactionProducers;
625        }
626        
627        public boolean isRestoreTransaction() {
628            return restoreTransaction;
629        }
630    
631        public void setRestoreTransaction(boolean restoreTransaction) {
632            this.restoreTransaction = restoreTransaction;
633        }
634    
635        public boolean isTrackMessages() {
636            return trackMessages;
637        }
638    
639        public void setTrackMessages(boolean trackMessages) {
640            this.trackMessages = trackMessages;
641        }
642    
643        public int getMaxCacheSize() {
644            return maxCacheSize;
645        }
646    
647        public void setMaxCacheSize(int maxCacheSize) {
648            this.maxCacheSize = maxCacheSize;
649        }
650    
651        public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
652            ConnectionState connectionState = connectionStates.get(connectionId);
653            if (connectionState != null) {
654                connectionState.setConnectionInterruptProcessingComplete(true);
655                Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
656                for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
657                    ConsumerControl control = new ConsumerControl();
658                    control.setConsumerId(entry.getKey());
659                    control.setPrefetch(entry.getValue().getPrefetchSize());
660                    control.setDestination(entry.getValue().getDestination());
661                    try {
662                        if (LOG.isDebugEnabled()) {
663                            LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
664                        }
665                        transport.oneway(control);  
666                    } catch (Exception ex) {
667                        if (LOG.isDebugEnabled()) {
668                            LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
669                                    + " with: " + control.getPrefetch(), ex);
670                        }
671                    }
672                }
673                stalledConsumers.clear();
674            }
675        }
676    
677        public void transportInterrupted(ConnectionId connectionId) {
678            ConnectionState connectionState = connectionStates.get(connectionId);
679            if (connectionState != null) {
680                connectionState.setConnectionInterruptProcessingComplete(false);
681            }
682        }
683    }