001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.Serializable; 020 021 import javax.jms.JMSException; 022 import javax.jms.Message; 023 024 import org.apache.activemq.broker.region.MessageReference; 025 import org.apache.activemq.command.MessageId; 026 import org.apache.activemq.command.ProducerId; 027 import org.apache.activemq.util.BitArrayBin; 028 import org.apache.activemq.util.IdGenerator; 029 import org.apache.activemq.util.LRUCache; 030 031 /** 032 * Provides basic audit functions for Messages without sync 033 * 034 * 035 */ 036 public class ActiveMQMessageAuditNoSync implements Serializable { 037 038 private static final long serialVersionUID = 1L; 039 040 public static final int DEFAULT_WINDOW_SIZE = 2048; 041 public static final int MAXIMUM_PRODUCER_COUNT = 64; 042 private int auditDepth; 043 private int maximumNumberOfProducersToTrack; 044 private LRUCache<Object, BitArrayBin> map; 045 046 /** 047 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 048 * 64 049 */ 050 public ActiveMQMessageAuditNoSync() { 051 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); 052 } 053 054 /** 055 * Construct a MessageAudit 056 * 057 * @param auditDepth range of ids to track 058 * @param maximumNumberOfProducersToTrack number of producers expected in 059 * the system 060 */ 061 public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) { 062 this.auditDepth = auditDepth; 063 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; 064 this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true); 065 } 066 067 /** 068 * @return the auditDepth 069 */ 070 public int getAuditDepth() { 071 return auditDepth; 072 } 073 074 /** 075 * @param auditDepth the auditDepth to set 076 */ 077 public void setAuditDepth(int auditDepth) { 078 this.auditDepth = auditDepth; 079 } 080 081 /** 082 * @return the maximumNumberOfProducersToTrack 083 */ 084 public int getMaximumNumberOfProducersToTrack() { 085 return maximumNumberOfProducersToTrack; 086 } 087 088 /** 089 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set 090 */ 091 public void setMaximumNumberOfProducersToTrack( 092 int maximumNumberOfProducersToTrack) { 093 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; 094 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); 095 } 096 097 /** 098 * Checks if this message has been seen before 099 * 100 * @param message 101 * @return true if the message is a duplicate 102 * @throws JMSException 103 */ 104 public boolean isDuplicate(Message message) throws JMSException { 105 return isDuplicate(message.getJMSMessageID()); 106 } 107 108 /** 109 * checks whether this messageId has been seen before and adds this 110 * messageId to the list 111 * 112 * @param id 113 * @return true if the message is a duplicate 114 */ 115 public boolean isDuplicate(String id) { 116 boolean answer = false; 117 String seed = IdGenerator.getSeedFromId(id); 118 if (seed != null) { 119 BitArrayBin bab = map.get(seed); 120 if (bab == null) { 121 bab = new BitArrayBin(auditDepth); 122 map.put(seed, bab); 123 } 124 long index = IdGenerator.getSequenceFromId(id); 125 if (index >= 0) { 126 answer = bab.setBit(index, true); 127 } 128 } 129 return answer; 130 } 131 132 /** 133 * Checks if this message has been seen before 134 * 135 * @param message 136 * @return true if the message is a duplicate 137 */ 138 public boolean isDuplicate(final MessageReference message) { 139 MessageId id = message.getMessageId(); 140 return isDuplicate(id); 141 } 142 143 /** 144 * Checks if this messageId has been seen before 145 * 146 * @param id 147 * @return true if the message is a duplicate 148 */ 149 public boolean isDuplicate(final MessageId id) { 150 boolean answer = false; 151 152 if (id != null) { 153 ProducerId pid = id.getProducerId(); 154 if (pid != null) { 155 BitArrayBin bab = map.get(pid); 156 if (bab == null) { 157 bab = new BitArrayBin(auditDepth); 158 map.put(pid, bab); 159 } 160 answer = bab.setBit(id.getProducerSequenceId(), true); 161 } 162 } 163 return answer; 164 } 165 166 /** 167 * mark this message as being received 168 * 169 * @param message 170 */ 171 public void rollback(final MessageReference message) { 172 MessageId id = message.getMessageId(); 173 rollback(id); 174 } 175 176 /** 177 * mark this message as being received 178 * 179 * @param id 180 */ 181 public void rollback(final MessageId id) { 182 if (id != null) { 183 ProducerId pid = id.getProducerId(); 184 if (pid != null) { 185 BitArrayBin bab = map.get(pid); 186 if (bab != null) { 187 bab.setBit(id.getProducerSequenceId(), false); 188 } 189 } 190 } 191 } 192 193 /** 194 * Check the message is in order 195 * @param msg 196 * @return 197 * @throws JMSException 198 */ 199 public boolean isInOrder(Message msg) throws JMSException { 200 return isInOrder(msg.getJMSMessageID()); 201 } 202 203 /** 204 * Check the message id is in order 205 * @param id 206 * @return 207 */ 208 public boolean isInOrder(final String id) { 209 boolean answer = true; 210 211 if (id != null) { 212 String seed = IdGenerator.getSeedFromId(id); 213 if (seed != null) { 214 BitArrayBin bab = map.get(seed); 215 if (bab != null) { 216 long index = IdGenerator.getSequenceFromId(id); 217 answer = bab.isInOrder(index); 218 } 219 220 } 221 } 222 return answer; 223 } 224 225 /** 226 * Check the MessageId is in order 227 * @param message 228 * @return 229 */ 230 public boolean isInOrder(final MessageReference message) { 231 return isInOrder(message.getMessageId()); 232 } 233 234 /** 235 * Check the MessageId is in order 236 * @param id 237 * @return 238 */ 239 public boolean isInOrder(final MessageId id) { 240 boolean answer = false; 241 242 if (id != null) { 243 ProducerId pid = id.getProducerId(); 244 if (pid != null) { 245 BitArrayBin bab = map.get(pid); 246 if (bab == null) { 247 bab = new BitArrayBin(auditDepth); 248 map.put(pid, bab); 249 } 250 answer = bab.isInOrder(id.getProducerSequenceId()); 251 252 } 253 } 254 return answer; 255 } 256 257 public long getLastSeqId(ProducerId id) { 258 long result = -1; 259 BitArrayBin bab = map.get(id.toString() + ":"); 260 if (bab != null) { 261 result = bab.getLastSetIndex(); 262 } 263 return result; 264 } 265 266 public void clear() { 267 map.clear(); 268 } 269 }