001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.command; 018 019 import java.io.DataInputStream; 020 import java.io.DataOutputStream; 021 import java.io.IOException; 022 import java.util.Collections; 023 import java.util.HashMap; 024 import java.util.Map; 025 import javax.jms.JMSException; 026 import org.apache.activemq.ActiveMQConnection; 027 import org.apache.activemq.advisory.AdvisorySupport; 028 import org.apache.activemq.broker.region.Destination; 029 import org.apache.activemq.broker.region.MessageReference; 030 import org.apache.activemq.broker.region.RegionBroker; 031 import org.apache.activemq.usage.MemoryUsage; 032 import org.apache.activemq.util.ByteArrayInputStream; 033 import org.apache.activemq.util.ByteArrayOutputStream; 034 import org.apache.activemq.util.ByteSequence; 035 import org.apache.activemq.util.MarshallingSupport; 036 import org.apache.activemq.wireformat.WireFormat; 037 038 /** 039 * Represents an ActiveMQ message 040 * 041 * @openwire:marshaller 042 * 043 */ 044 public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 045 046 /** 047 * The default minimum amount of memory a message is assumed to use 048 */ 049 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 050 051 protected MessageId messageId; 052 protected ActiveMQDestination originalDestination; 053 protected TransactionId originalTransactionId; 054 055 protected ProducerId producerId; 056 protected ActiveMQDestination destination; 057 protected TransactionId transactionId; 058 059 protected long expiration; 060 protected long timestamp; 061 protected long arrival; 062 protected long brokerInTime; 063 protected long brokerOutTime; 064 protected String correlationId; 065 protected ActiveMQDestination replyTo; 066 protected boolean persistent; 067 protected String type; 068 protected byte priority; 069 protected String groupID; 070 protected int groupSequence; 071 protected ConsumerId targetConsumerId; 072 protected boolean compressed; 073 protected String userID; 074 075 protected ByteSequence content; 076 protected ByteSequence marshalledProperties; 077 protected DataStructure dataStructure; 078 protected int redeliveryCounter; 079 080 protected int size; 081 protected Map<String, Object> properties; 082 protected boolean readOnlyProperties; 083 protected boolean readOnlyBody; 084 protected transient boolean recievedByDFBridge; 085 protected boolean droppable; 086 087 private transient short referenceCount; 088 private transient ActiveMQConnection connection; 089 private transient org.apache.activemq.broker.region.Destination regionDestination; 090 private transient MemoryUsage memoryUsage; 091 092 private BrokerId[] brokerPath; 093 private BrokerId[] cluster; 094 095 public abstract Message copy(); 096 public abstract void clearBody() throws JMSException; 097 098 // useful to reduce the memory footprint of a persisted message 099 public void clearMarshalledState() throws JMSException { 100 properties = null; 101 } 102 103 protected void copy(Message copy) { 104 super.copy(copy); 105 copy.producerId = producerId; 106 copy.transactionId = transactionId; 107 copy.destination = destination; 108 copy.messageId = messageId != null ? messageId.copy() : null; 109 copy.originalDestination = originalDestination; 110 copy.originalTransactionId = originalTransactionId; 111 copy.expiration = expiration; 112 copy.timestamp = timestamp; 113 copy.correlationId = correlationId; 114 copy.replyTo = replyTo; 115 copy.persistent = persistent; 116 copy.redeliveryCounter = redeliveryCounter; 117 copy.type = type; 118 copy.priority = priority; 119 copy.size = size; 120 copy.groupID = groupID; 121 copy.userID = userID; 122 copy.groupSequence = groupSequence; 123 124 if (properties != null) { 125 copy.properties = new HashMap<String, Object>(properties); 126 127 // The new message hasn't expired, so remove this feild. 128 copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION); 129 } else { 130 copy.properties = properties; 131 } 132 133 copy.content = content; 134 copy.marshalledProperties = marshalledProperties; 135 copy.dataStructure = dataStructure; 136 copy.readOnlyProperties = readOnlyProperties; 137 copy.readOnlyBody = readOnlyBody; 138 copy.compressed = compressed; 139 copy.recievedByDFBridge = recievedByDFBridge; 140 141 copy.arrival = arrival; 142 copy.connection = connection; 143 copy.regionDestination = regionDestination; 144 copy.brokerInTime = brokerInTime; 145 copy.brokerOutTime = brokerOutTime; 146 copy.memoryUsage=this.memoryUsage; 147 copy.brokerPath = brokerPath; 148 149 // lets not copy the following fields 150 // copy.targetConsumerId = targetConsumerId; 151 // copy.referenceCount = referenceCount; 152 } 153 154 public Object getProperty(String name) throws IOException { 155 if (properties == null) { 156 if (marshalledProperties == null) { 157 return null; 158 } 159 properties = unmarsallProperties(marshalledProperties); 160 } 161 return properties.get(name); 162 } 163 164 @SuppressWarnings("unchecked") 165 public Map<String, Object> getProperties() throws IOException { 166 if (properties == null) { 167 if (marshalledProperties == null) { 168 return Collections.EMPTY_MAP; 169 } 170 properties = unmarsallProperties(marshalledProperties); 171 } 172 return Collections.unmodifiableMap(properties); 173 } 174 175 public void clearProperties() { 176 marshalledProperties = null; 177 properties = null; 178 } 179 180 public void setProperty(String name, Object value) throws IOException { 181 lazyCreateProperties(); 182 properties.put(name, value); 183 } 184 185 public void removeProperty(String name) throws IOException { 186 lazyCreateProperties(); 187 properties.remove(name); 188 } 189 190 protected void lazyCreateProperties() throws IOException { 191 if (properties == null) { 192 if (marshalledProperties == null) { 193 properties = new HashMap<String, Object>(); 194 } else { 195 properties = unmarsallProperties(marshalledProperties); 196 marshalledProperties = null; 197 } 198 } 199 } 200 201 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 202 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 203 } 204 205 public void beforeMarshall(WireFormat wireFormat) throws IOException { 206 // Need to marshal the properties. 207 if (marshalledProperties == null && properties != null) { 208 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 209 DataOutputStream os = new DataOutputStream(baos); 210 MarshallingSupport.marshalPrimitiveMap(properties, os); 211 os.close(); 212 marshalledProperties = baos.toByteSequence(); 213 } 214 } 215 216 public void afterMarshall(WireFormat wireFormat) throws IOException { 217 } 218 219 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 220 } 221 222 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 223 } 224 225 // ///////////////////////////////////////////////////////////////// 226 // 227 // Simple Field accessors 228 // 229 // ///////////////////////////////////////////////////////////////// 230 231 /** 232 * @openwire:property version=1 cache=true 233 */ 234 public ProducerId getProducerId() { 235 return producerId; 236 } 237 238 public void setProducerId(ProducerId producerId) { 239 this.producerId = producerId; 240 } 241 242 /** 243 * @openwire:property version=1 cache=true 244 */ 245 public ActiveMQDestination getDestination() { 246 return destination; 247 } 248 249 public void setDestination(ActiveMQDestination destination) { 250 this.destination = destination; 251 } 252 253 /** 254 * @openwire:property version=1 cache=true 255 */ 256 public TransactionId getTransactionId() { 257 return transactionId; 258 } 259 260 public void setTransactionId(TransactionId transactionId) { 261 this.transactionId = transactionId; 262 } 263 264 public boolean isInTransaction() { 265 return transactionId != null; 266 } 267 268 /** 269 * @openwire:property version=1 cache=true 270 */ 271 public ActiveMQDestination getOriginalDestination() { 272 return originalDestination; 273 } 274 275 public void setOriginalDestination(ActiveMQDestination destination) { 276 this.originalDestination = destination; 277 } 278 279 /** 280 * @openwire:property version=1 281 */ 282 public MessageId getMessageId() { 283 return messageId; 284 } 285 286 public void setMessageId(MessageId messageId) { 287 this.messageId = messageId; 288 } 289 290 /** 291 * @openwire:property version=1 cache=true 292 */ 293 public TransactionId getOriginalTransactionId() { 294 return originalTransactionId; 295 } 296 297 public void setOriginalTransactionId(TransactionId transactionId) { 298 this.originalTransactionId = transactionId; 299 } 300 301 /** 302 * @openwire:property version=1 303 */ 304 public String getGroupID() { 305 return groupID; 306 } 307 308 public void setGroupID(String groupID) { 309 this.groupID = groupID; 310 } 311 312 /** 313 * @openwire:property version=1 314 */ 315 public int getGroupSequence() { 316 return groupSequence; 317 } 318 319 public void setGroupSequence(int groupSequence) { 320 this.groupSequence = groupSequence; 321 } 322 323 /** 324 * @openwire:property version=1 325 */ 326 public String getCorrelationId() { 327 return correlationId; 328 } 329 330 public void setCorrelationId(String correlationId) { 331 this.correlationId = correlationId; 332 } 333 334 /** 335 * @openwire:property version=1 336 */ 337 public boolean isPersistent() { 338 return persistent; 339 } 340 341 public void setPersistent(boolean deliveryMode) { 342 this.persistent = deliveryMode; 343 } 344 345 /** 346 * @openwire:property version=1 347 */ 348 public long getExpiration() { 349 return expiration; 350 } 351 352 public void setExpiration(long expiration) { 353 this.expiration = expiration; 354 } 355 356 /** 357 * @openwire:property version=1 358 */ 359 public byte getPriority() { 360 return priority; 361 } 362 363 public void setPriority(byte priority) { 364 if (priority < 0) { 365 this.priority = 0; 366 } else if (priority > 9) { 367 this.priority = 9; 368 } else { 369 this.priority = priority; 370 } 371 } 372 373 /** 374 * @openwire:property version=1 375 */ 376 public ActiveMQDestination getReplyTo() { 377 return replyTo; 378 } 379 380 public void setReplyTo(ActiveMQDestination replyTo) { 381 this.replyTo = replyTo; 382 } 383 384 /** 385 * @openwire:property version=1 386 */ 387 public long getTimestamp() { 388 return timestamp; 389 } 390 391 public void setTimestamp(long timestamp) { 392 this.timestamp = timestamp; 393 } 394 395 /** 396 * @openwire:property version=1 397 */ 398 public String getType() { 399 return type; 400 } 401 402 public void setType(String type) { 403 this.type = type; 404 } 405 406 /** 407 * @openwire:property version=1 408 */ 409 public ByteSequence getContent() { 410 return content; 411 } 412 413 public void setContent(ByteSequence content) { 414 this.content = content; 415 } 416 417 /** 418 * @openwire:property version=1 419 */ 420 public ByteSequence getMarshalledProperties() { 421 return marshalledProperties; 422 } 423 424 public void setMarshalledProperties(ByteSequence marshalledProperties) { 425 this.marshalledProperties = marshalledProperties; 426 } 427 428 /** 429 * @openwire:property version=1 430 */ 431 public DataStructure getDataStructure() { 432 return dataStructure; 433 } 434 435 public void setDataStructure(DataStructure data) { 436 this.dataStructure = data; 437 } 438 439 /** 440 * Can be used to route the message to a specific consumer. Should be null 441 * to allow the broker use normal JMS routing semantics. If the target 442 * consumer id is an active consumer on the broker, the message is dropped. 443 * Used by the AdvisoryBroker to replay advisory messages to a specific 444 * consumer. 445 * 446 * @openwire:property version=1 cache=true 447 */ 448 public ConsumerId getTargetConsumerId() { 449 return targetConsumerId; 450 } 451 452 public void setTargetConsumerId(ConsumerId targetConsumerId) { 453 this.targetConsumerId = targetConsumerId; 454 } 455 456 public boolean isExpired() { 457 long expireTime = getExpiration(); 458 return expireTime > 0 && System.currentTimeMillis() > expireTime; 459 } 460 461 public boolean isAdvisory() { 462 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 463 } 464 465 /** 466 * @openwire:property version=1 467 */ 468 public boolean isCompressed() { 469 return compressed; 470 } 471 472 public void setCompressed(boolean compressed) { 473 this.compressed = compressed; 474 } 475 476 public boolean isRedelivered() { 477 return redeliveryCounter > 0; 478 } 479 480 public void setRedelivered(boolean redelivered) { 481 if (redelivered) { 482 if (!isRedelivered()) { 483 setRedeliveryCounter(1); 484 } 485 } else { 486 if (isRedelivered()) { 487 setRedeliveryCounter(0); 488 } 489 } 490 } 491 492 public void incrementRedeliveryCounter() { 493 redeliveryCounter++; 494 } 495 496 /** 497 * @openwire:property version=1 498 */ 499 public int getRedeliveryCounter() { 500 return redeliveryCounter; 501 } 502 503 public void setRedeliveryCounter(int deliveryCounter) { 504 this.redeliveryCounter = deliveryCounter; 505 } 506 507 /** 508 * The route of brokers the command has moved through. 509 * 510 * @openwire:property version=1 cache=true 511 */ 512 public BrokerId[] getBrokerPath() { 513 return brokerPath; 514 } 515 516 public void setBrokerPath(BrokerId[] brokerPath) { 517 this.brokerPath = brokerPath; 518 } 519 520 public boolean isReadOnlyProperties() { 521 return readOnlyProperties; 522 } 523 524 public void setReadOnlyProperties(boolean readOnlyProperties) { 525 this.readOnlyProperties = readOnlyProperties; 526 } 527 528 public boolean isReadOnlyBody() { 529 return readOnlyBody; 530 } 531 532 public void setReadOnlyBody(boolean readOnlyBody) { 533 this.readOnlyBody = readOnlyBody; 534 } 535 536 public ActiveMQConnection getConnection() { 537 return this.connection; 538 } 539 540 public void setConnection(ActiveMQConnection connection) { 541 this.connection = connection; 542 } 543 544 /** 545 * Used to schedule the arrival time of a message to a broker. The broker 546 * will not dispatch a message to a consumer until it's arrival time has 547 * elapsed. 548 * 549 * @openwire:property version=1 550 */ 551 public long getArrival() { 552 return arrival; 553 } 554 555 public void setArrival(long arrival) { 556 this.arrival = arrival; 557 } 558 559 /** 560 * Only set by the broker and defines the userID of the producer connection 561 * who sent this message. This is an optional field, it needs to be enabled 562 * on the broker to have this field populated. 563 * 564 * @openwire:property version=1 565 */ 566 public String getUserID() { 567 return userID; 568 } 569 570 public void setUserID(String jmsxUserID) { 571 this.userID = jmsxUserID; 572 } 573 574 public int getReferenceCount() { 575 return referenceCount; 576 } 577 578 public Message getMessageHardRef() { 579 return this; 580 } 581 582 public Message getMessage() { 583 return this; 584 } 585 586 public org.apache.activemq.broker.region.Destination getRegionDestination() { 587 return regionDestination; 588 } 589 590 public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) { 591 this.regionDestination = destination; 592 if(this.memoryUsage==null) { 593 this.memoryUsage=regionDestination.getMemoryUsage(); 594 } 595 } 596 597 public MemoryUsage getMemoryUsage() { 598 return this.memoryUsage; 599 } 600 601 public void setMemoryUsage(MemoryUsage usage) { 602 this.memoryUsage=usage; 603 } 604 605 @Override 606 public boolean isMarshallAware() { 607 return true; 608 } 609 610 public int incrementReferenceCount() { 611 int rc; 612 int size; 613 synchronized (this) { 614 rc = ++referenceCount; 615 size = getSize(); 616 } 617 618 if (rc == 1 && getMemoryUsage() != null) { 619 getMemoryUsage().increaseUsage(size); 620 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 621 622 } 623 624 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 625 return rc; 626 } 627 628 public int decrementReferenceCount() { 629 int rc; 630 int size; 631 synchronized (this) { 632 rc = --referenceCount; 633 size = getSize(); 634 } 635 636 if (rc == 0 && getMemoryUsage() != null) { 637 getMemoryUsage().decreaseUsage(size); 638 //Thread.dumpStack(); 639 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 640 } 641 642 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 643 644 return rc; 645 } 646 647 public int getSize() { 648 int minimumMessageSize = getMinimumMessageSize(); 649 if (size < minimumMessageSize || size == 0) { 650 size = minimumMessageSize; 651 if (marshalledProperties != null) { 652 size += marshalledProperties.getLength(); 653 } 654 if (content != null) { 655 size += content.getLength(); 656 } 657 } 658 return size; 659 } 660 661 protected int getMinimumMessageSize() { 662 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 663 //let destination override 664 Destination dest = regionDestination; 665 if (dest != null) { 666 result=dest.getMinimumMessageSize(); 667 } 668 return result; 669 } 670 671 /** 672 * @openwire:property version=1 673 * @return Returns the recievedByDFBridge. 674 */ 675 public boolean isRecievedByDFBridge() { 676 return recievedByDFBridge; 677 } 678 679 /** 680 * @param recievedByDFBridge The recievedByDFBridge to set. 681 */ 682 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 683 this.recievedByDFBridge = recievedByDFBridge; 684 } 685 686 public void onMessageRolledBack() { 687 incrementRedeliveryCounter(); 688 } 689 690 /** 691 * @openwire:property version=2 cache=true 692 */ 693 public boolean isDroppable() { 694 return droppable; 695 } 696 697 public void setDroppable(boolean droppable) { 698 this.droppable = droppable; 699 } 700 701 /** 702 * If a message is stored in multiple nodes on a cluster, all the cluster 703 * members will be listed here. Otherwise, it will be null. 704 * 705 * @openwire:property version=3 cache=true 706 */ 707 public BrokerId[] getCluster() { 708 return cluster; 709 } 710 711 public void setCluster(BrokerId[] cluster) { 712 this.cluster = cluster; 713 } 714 715 @Override 716 public boolean isMessage() { 717 return true; 718 } 719 720 /** 721 * @openwire:property version=3 722 */ 723 public long getBrokerInTime() { 724 return this.brokerInTime; 725 } 726 727 public void setBrokerInTime(long brokerInTime) { 728 this.brokerInTime = brokerInTime; 729 } 730 731 /** 732 * @openwire:property version=3 733 */ 734 public long getBrokerOutTime() { 735 return this.brokerOutTime; 736 } 737 738 public void setBrokerOutTime(long brokerOutTime) { 739 this.brokerOutTime = brokerOutTime; 740 } 741 742 public boolean isDropped() { 743 return false; 744 } 745 746 @Override 747 public String toString() { 748 return toString(null); 749 } 750 751 @Override 752 public String toString(Map<String, Object>overrideFields) { 753 try { 754 getProperties(); 755 } catch (IOException e) { 756 } 757 return super.toString(overrideFields); 758 } 759 }