001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.state; 018 019 import java.io.IOException; 020 import java.util.Iterator; 021 import java.util.LinkedHashMap; 022 import java.util.Map; 023 import java.util.Vector; 024 import java.util.Map.Entry; 025 import java.util.concurrent.ConcurrentHashMap; 026 027 import javax.jms.TransactionRolledBackException; 028 import javax.transaction.xa.XAResource; 029 030 import org.apache.activemq.command.Command; 031 import org.apache.activemq.command.ConnectionId; 032 import org.apache.activemq.command.ConnectionInfo; 033 import org.apache.activemq.command.ConsumerControl; 034 import org.apache.activemq.command.ConsumerId; 035 import org.apache.activemq.command.ConsumerInfo; 036 import org.apache.activemq.command.DestinationInfo; 037 import org.apache.activemq.command.ExceptionResponse; 038 import org.apache.activemq.command.IntegerResponse; 039 import org.apache.activemq.command.Message; 040 import org.apache.activemq.command.MessageId; 041 import org.apache.activemq.command.MessagePull; 042 import org.apache.activemq.command.ProducerId; 043 import org.apache.activemq.command.ProducerInfo; 044 import org.apache.activemq.command.Response; 045 import org.apache.activemq.command.SessionId; 046 import org.apache.activemq.command.SessionInfo; 047 import org.apache.activemq.command.TransactionInfo; 048 import org.apache.activemq.transport.Transport; 049 import org.apache.activemq.util.IOExceptionSupport; 050 import org.slf4j.Logger; 051 import org.slf4j.LoggerFactory; 052 053 /** 054 * Tracks the state of a connection so a newly established transport can be 055 * re-initialized to the state that was tracked. 056 * 057 * 058 */ 059 public class ConnectionStateTracker extends CommandVisitorAdapter { 060 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class); 061 062 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 063 064 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 065 066 private boolean trackTransactions; 067 private boolean restoreSessions = true; 068 private boolean restoreConsumers = true; 069 private boolean restoreProducers = true; 070 private boolean restoreTransaction = true; 071 private boolean trackMessages = true; 072 private boolean trackTransactionProducers = true; 073 private int maxCacheSize = 128 * 1024; 074 private int currentCacheSize; 075 private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ 076 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) { 077 boolean result = currentCacheSize > maxCacheSize; 078 if (result) { 079 if (eldest.getValue() instanceof Message) { 080 currentCacheSize -= ((Message)eldest.getValue()).getSize(); 081 } 082 } 083 return result; 084 } 085 }; 086 087 private class RemoveTransactionAction implements ResponseHandler { 088 private final TransactionInfo info; 089 090 public RemoveTransactionAction(TransactionInfo info) { 091 this.info = info; 092 } 093 094 public void onResponse(Command response) { 095 ConnectionId connectionId = info.getConnectionId(); 096 ConnectionState cs = connectionStates.get(connectionId); 097 cs.removeTransactionState(info.getTransactionId()); 098 } 099 } 100 101 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { 102 103 public PrepareReadonlyTransactionAction(TransactionInfo info) { 104 super(info); 105 } 106 107 public void onResponse(Command command) { 108 IntegerResponse response = (IntegerResponse) command; 109 if (XAResource.XA_RDONLY == response.getResult()) { 110 // all done, no commit or rollback from TM 111 super.onResponse(command); 112 } 113 } 114 } 115 116 /** 117 * 118 * 119 * @param command 120 * @return null if the command is not state tracked. 121 * @throws IOException 122 */ 123 public Tracked track(Command command) throws IOException { 124 try { 125 return (Tracked)command.visit(this); 126 } catch (IOException e) { 127 throw e; 128 } catch (Throwable e) { 129 throw IOExceptionSupport.create(e); 130 } 131 } 132 133 public void trackBack(Command command) { 134 if (command != null) { 135 if (trackMessages && command.isMessage()) { 136 Message message = (Message) command; 137 if (message.getTransactionId()==null) { 138 currentCacheSize = currentCacheSize + message.getSize(); 139 } 140 } else if (command instanceof MessagePull) { 141 // just needs to be a rough estimate of size, ~4 identifiers 142 currentCacheSize += 400; 143 } 144 } 145 } 146 147 public void restore(Transport transport) throws IOException { 148 // Restore the connections. 149 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { 150 ConnectionState connectionState = iter.next(); 151 connectionState.getInfo().setFailoverReconnect(true); 152 if (LOG.isDebugEnabled()) { 153 LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); 154 } 155 transport.oneway(connectionState.getInfo()); 156 restoreTempDestinations(transport, connectionState); 157 158 if (restoreSessions) { 159 restoreSessions(transport, connectionState); 160 } 161 162 if (restoreTransaction) { 163 restoreTransactions(transport, connectionState); 164 } 165 } 166 //now flush messages 167 for (Command msg:messageCache.values()) { 168 if (LOG.isDebugEnabled()) { 169 LOG.debug("command: " + msg.getCommandId()); 170 } 171 transport.oneway(msg); 172 } 173 } 174 175 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 176 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>(); 177 for (TransactionState transactionState : connectionState.getTransactionStates()) { 178 if (LOG.isDebugEnabled()) { 179 LOG.debug("tx: " + transactionState.getId()); 180 } 181 182 // rollback any completed transactions - no way to know if commit got there 183 // or if reply went missing 184 // 185 if (!transactionState.getCommands().isEmpty()) { 186 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1); 187 if (lastCommand instanceof TransactionInfo) { 188 TransactionInfo transactionInfo = (TransactionInfo) lastCommand; 189 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { 190 if (LOG.isDebugEnabled()) { 191 LOG.debug("rolling back potentially completed tx: " + transactionState.getId()); 192 } 193 toRollback.add(transactionInfo); 194 continue; 195 } 196 } 197 } 198 199 // replay short lived producers that may have been involved in the transaction 200 for (ProducerState producerState : transactionState.getProducerStates().values()) { 201 if (LOG.isDebugEnabled()) { 202 LOG.debug("tx replay producer :" + producerState.getInfo()); 203 } 204 transport.oneway(producerState.getInfo()); 205 } 206 207 for (Command command : transactionState.getCommands()) { 208 if (LOG.isDebugEnabled()) { 209 LOG.debug("tx replay: " + command); 210 } 211 transport.oneway(command); 212 } 213 214 for (ProducerState producerState : transactionState.getProducerStates().values()) { 215 if (LOG.isDebugEnabled()) { 216 LOG.debug("tx remove replayed producer :" + producerState.getInfo()); 217 } 218 transport.oneway(producerState.getInfo().createRemoveCommand()); 219 } 220 } 221 222 for (TransactionInfo command: toRollback) { 223 // respond to the outstanding commit 224 ExceptionResponse response = new ExceptionResponse(); 225 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId())); 226 response.setCorrelationId(command.getCommandId()); 227 transport.getTransportListener().onCommand(response); 228 } 229 } 230 231 /** 232 * @param transport 233 * @param connectionState 234 * @throws IOException 235 */ 236 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 237 // Restore the connection's sessions 238 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 239 SessionState sessionState = (SessionState)iter2.next(); 240 if (LOG.isDebugEnabled()) { 241 LOG.debug("session: " + sessionState.getInfo().getSessionId()); 242 } 243 transport.oneway(sessionState.getInfo()); 244 245 if (restoreProducers) { 246 restoreProducers(transport, sessionState); 247 } 248 249 if (restoreConsumers) { 250 restoreConsumers(transport, sessionState); 251 } 252 } 253 } 254 255 /** 256 * @param transport 257 * @param sessionState 258 * @throws IOException 259 */ 260 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 261 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete 262 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId()); 263 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete(); 264 for (ConsumerState consumerState : sessionState.getConsumerStates()) { 265 ConsumerInfo infoToSend = consumerState.getInfo(); 266 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) { 267 infoToSend = consumerState.getInfo().copy(); 268 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo()); 269 infoToSend.setPrefetchSize(0); 270 if (LOG.isDebugEnabled()) { 271 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize()); 272 } 273 } 274 if (LOG.isDebugEnabled()) { 275 LOG.debug("restore consumer: " + infoToSend.getConsumerId()); 276 } 277 transport.oneway(infoToSend); 278 } 279 } 280 281 /** 282 * @param transport 283 * @param sessionState 284 * @throws IOException 285 */ 286 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 287 // Restore the session's producers 288 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 289 ProducerState producerState = (ProducerState)iter3.next(); 290 if (LOG.isDebugEnabled()) { 291 LOG.debug("producer: " + producerState.getInfo().getProducerId()); 292 } 293 transport.oneway(producerState.getInfo()); 294 } 295 } 296 297 /** 298 * @param transport 299 * @param connectionState 300 * @throws IOException 301 */ 302 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) 303 throws IOException { 304 // Restore the connection's temp destinations. 305 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) { 306 transport.oneway((DestinationInfo)iter2.next()); 307 } 308 } 309 310 public Response processAddDestination(DestinationInfo info) { 311 if (info != null) { 312 ConnectionState cs = connectionStates.get(info.getConnectionId()); 313 if (cs != null && info.getDestination().isTemporary()) { 314 cs.addTempDestination(info); 315 } 316 } 317 return TRACKED_RESPONSE_MARKER; 318 } 319 320 public Response processRemoveDestination(DestinationInfo info) { 321 if (info != null) { 322 ConnectionState cs = connectionStates.get(info.getConnectionId()); 323 if (cs != null && info.getDestination().isTemporary()) { 324 cs.removeTempDestination(info.getDestination()); 325 } 326 } 327 return TRACKED_RESPONSE_MARKER; 328 } 329 330 public Response processAddProducer(ProducerInfo info) { 331 if (info != null && info.getProducerId() != null) { 332 SessionId sessionId = info.getProducerId().getParentId(); 333 if (sessionId != null) { 334 ConnectionId connectionId = sessionId.getParentId(); 335 if (connectionId != null) { 336 ConnectionState cs = connectionStates.get(connectionId); 337 if (cs != null) { 338 SessionState ss = cs.getSessionState(sessionId); 339 if (ss != null) { 340 ss.addProducer(info); 341 } 342 } 343 } 344 } 345 } 346 return TRACKED_RESPONSE_MARKER; 347 } 348 349 public Response processRemoveProducer(ProducerId id) { 350 if (id != null) { 351 SessionId sessionId = id.getParentId(); 352 if (sessionId != null) { 353 ConnectionId connectionId = sessionId.getParentId(); 354 if (connectionId != null) { 355 ConnectionState cs = connectionStates.get(connectionId); 356 if (cs != null) { 357 SessionState ss = cs.getSessionState(sessionId); 358 if (ss != null) { 359 ss.removeProducer(id); 360 } 361 } 362 } 363 } 364 } 365 return TRACKED_RESPONSE_MARKER; 366 } 367 368 public Response processAddConsumer(ConsumerInfo info) { 369 if (info != null) { 370 SessionId sessionId = info.getConsumerId().getParentId(); 371 if (sessionId != null) { 372 ConnectionId connectionId = sessionId.getParentId(); 373 if (connectionId != null) { 374 ConnectionState cs = connectionStates.get(connectionId); 375 if (cs != null) { 376 SessionState ss = cs.getSessionState(sessionId); 377 if (ss != null) { 378 ss.addConsumer(info); 379 } 380 } 381 } 382 } 383 } 384 return TRACKED_RESPONSE_MARKER; 385 } 386 387 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { 388 if (id != null) { 389 SessionId sessionId = id.getParentId(); 390 if (sessionId != null) { 391 ConnectionId connectionId = sessionId.getParentId(); 392 if (connectionId != null) { 393 ConnectionState cs = connectionStates.get(connectionId); 394 if (cs != null) { 395 SessionState ss = cs.getSessionState(sessionId); 396 if (ss != null) { 397 ss.removeConsumer(id); 398 } 399 } 400 } 401 } 402 } 403 return TRACKED_RESPONSE_MARKER; 404 } 405 406 public Response processAddSession(SessionInfo info) { 407 if (info != null) { 408 ConnectionId connectionId = info.getSessionId().getParentId(); 409 if (connectionId != null) { 410 ConnectionState cs = connectionStates.get(connectionId); 411 if (cs != null) { 412 cs.addSession(info); 413 } 414 } 415 } 416 return TRACKED_RESPONSE_MARKER; 417 } 418 419 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { 420 if (id != null) { 421 ConnectionId connectionId = id.getParentId(); 422 if (connectionId != null) { 423 ConnectionState cs = connectionStates.get(connectionId); 424 if (cs != null) { 425 cs.removeSession(id); 426 } 427 } 428 } 429 return TRACKED_RESPONSE_MARKER; 430 } 431 432 public Response processAddConnection(ConnectionInfo info) { 433 if (info != null) { 434 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 435 } 436 return TRACKED_RESPONSE_MARKER; 437 } 438 439 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { 440 if (id != null) { 441 connectionStates.remove(id); 442 } 443 return TRACKED_RESPONSE_MARKER; 444 } 445 446 public Response processMessage(Message send) throws Exception { 447 if (send != null) { 448 if (trackTransactions && send.getTransactionId() != null) { 449 ProducerId producerId = send.getProducerId(); 450 ConnectionId connectionId = producerId.getParentId().getParentId(); 451 if (connectionId != null) { 452 ConnectionState cs = connectionStates.get(connectionId); 453 if (cs != null) { 454 TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); 455 if (transactionState != null) { 456 transactionState.addCommand(send); 457 458 if (trackTransactionProducers) { 459 // for jmstemplate, track the producer in case it is closed before commit 460 // and needs to be replayed 461 SessionState ss = cs.getSessionState(producerId.getParentId()); 462 ProducerState producerState = ss.getProducerState(producerId); 463 producerState.setTransactionState(transactionState); 464 } 465 } 466 } 467 } 468 return TRACKED_RESPONSE_MARKER; 469 }else if (trackMessages) { 470 messageCache.put(send.getMessageId(), send.copy()); 471 } 472 } 473 return null; 474 } 475 476 public Response processBeginTransaction(TransactionInfo info) { 477 if (trackTransactions && info != null && info.getTransactionId() != null) { 478 ConnectionId connectionId = info.getConnectionId(); 479 if (connectionId != null) { 480 ConnectionState cs = connectionStates.get(connectionId); 481 if (cs != null) { 482 cs.addTransactionState(info.getTransactionId()); 483 TransactionState state = cs.getTransactionState(info.getTransactionId()); 484 state.addCommand(info); 485 } 486 } 487 return TRACKED_RESPONSE_MARKER; 488 } 489 return null; 490 } 491 492 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 493 if (trackTransactions && info != null) { 494 ConnectionId connectionId = info.getConnectionId(); 495 if (connectionId != null) { 496 ConnectionState cs = connectionStates.get(connectionId); 497 if (cs != null) { 498 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 499 if (transactionState != null) { 500 transactionState.addCommand(info); 501 return new Tracked(new PrepareReadonlyTransactionAction(info)); 502 } 503 } 504 } 505 } 506 return null; 507 } 508 509 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 510 if (trackTransactions && info != null) { 511 ConnectionId connectionId = info.getConnectionId(); 512 if (connectionId != null) { 513 ConnectionState cs = connectionStates.get(connectionId); 514 if (cs != null) { 515 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 516 if (transactionState != null) { 517 transactionState.addCommand(info); 518 return new Tracked(new RemoveTransactionAction(info)); 519 } 520 } 521 } 522 } 523 return null; 524 } 525 526 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 527 if (trackTransactions && info != null) { 528 ConnectionId connectionId = info.getConnectionId(); 529 if (connectionId != null) { 530 ConnectionState cs = connectionStates.get(connectionId); 531 if (cs != null) { 532 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 533 if (transactionState != null) { 534 transactionState.addCommand(info); 535 return new Tracked(new RemoveTransactionAction(info)); 536 } 537 } 538 } 539 } 540 return null; 541 } 542 543 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 544 if (trackTransactions && info != null) { 545 ConnectionId connectionId = info.getConnectionId(); 546 if (connectionId != null) { 547 ConnectionState cs = connectionStates.get(connectionId); 548 if (cs != null) { 549 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 550 if (transactionState != null) { 551 transactionState.addCommand(info); 552 return new Tracked(new RemoveTransactionAction(info)); 553 } 554 } 555 } 556 } 557 return null; 558 } 559 560 public Response processEndTransaction(TransactionInfo info) throws Exception { 561 if (trackTransactions && info != null) { 562 ConnectionId connectionId = info.getConnectionId(); 563 if (connectionId != null) { 564 ConnectionState cs = connectionStates.get(connectionId); 565 if (cs != null) { 566 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 567 if (transactionState != null) { 568 transactionState.addCommand(info); 569 } 570 } 571 } 572 return TRACKED_RESPONSE_MARKER; 573 } 574 return null; 575 } 576 577 @Override 578 public Response processMessagePull(MessagePull pull) throws Exception { 579 if (pull != null) { 580 // leave a single instance in the cache 581 final String id = pull.getDestination() + "::" + pull.getConsumerId(); 582 messageCache.put(id.intern(), pull); 583 } 584 return null; 585 } 586 587 public boolean isRestoreConsumers() { 588 return restoreConsumers; 589 } 590 591 public void setRestoreConsumers(boolean restoreConsumers) { 592 this.restoreConsumers = restoreConsumers; 593 } 594 595 public boolean isRestoreProducers() { 596 return restoreProducers; 597 } 598 599 public void setRestoreProducers(boolean restoreProducers) { 600 this.restoreProducers = restoreProducers; 601 } 602 603 public boolean isRestoreSessions() { 604 return restoreSessions; 605 } 606 607 public void setRestoreSessions(boolean restoreSessions) { 608 this.restoreSessions = restoreSessions; 609 } 610 611 public boolean isTrackTransactions() { 612 return trackTransactions; 613 } 614 615 public void setTrackTransactions(boolean trackTransactions) { 616 this.trackTransactions = trackTransactions; 617 } 618 619 public boolean isTrackTransactionProducers() { 620 return this.trackTransactionProducers; 621 } 622 623 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 624 this.trackTransactionProducers = trackTransactionProducers; 625 } 626 627 public boolean isRestoreTransaction() { 628 return restoreTransaction; 629 } 630 631 public void setRestoreTransaction(boolean restoreTransaction) { 632 this.restoreTransaction = restoreTransaction; 633 } 634 635 public boolean isTrackMessages() { 636 return trackMessages; 637 } 638 639 public void setTrackMessages(boolean trackMessages) { 640 this.trackMessages = trackMessages; 641 } 642 643 public int getMaxCacheSize() { 644 return maxCacheSize; 645 } 646 647 public void setMaxCacheSize(int maxCacheSize) { 648 this.maxCacheSize = maxCacheSize; 649 } 650 651 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) { 652 ConnectionState connectionState = connectionStates.get(connectionId); 653 if (connectionState != null) { 654 connectionState.setConnectionInterruptProcessingComplete(true); 655 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers(); 656 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) { 657 ConsumerControl control = new ConsumerControl(); 658 control.setConsumerId(entry.getKey()); 659 control.setPrefetch(entry.getValue().getPrefetchSize()); 660 control.setDestination(entry.getValue().getDestination()); 661 try { 662 if (LOG.isDebugEnabled()) { 663 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch()); 664 } 665 transport.oneway(control); 666 } catch (Exception ex) { 667 if (LOG.isDebugEnabled()) { 668 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() 669 + " with: " + control.getPrefetch(), ex); 670 } 671 } 672 } 673 stalledConsumers.clear(); 674 } 675 } 676 677 public void transportInterrupted(ConnectionId connectionId) { 678 ConnectionState connectionState = connectionStates.get(connectionId); 679 if (connectionState != null) { 680 connectionState.setConnectionInterruptProcessingComplete(false); 681 } 682 } 683 }