001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import org.apache.activemq.broker.region.Destination;
020    import org.apache.activemq.broker.region.Region;
021    import org.apache.activemq.command.Message;
022    import org.apache.activemq.state.ProducerState;
023    import org.slf4j.Logger;
024    import org.slf4j.LoggerFactory;
025    
026    /**
027     * Holds internal state in the broker for a MessageProducer
028     * 
029     * 
030     */
031    public class ProducerBrokerExchange {
032    
033        private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
034        private ConnectionContext connectionContext;
035        private Destination regionDestination;
036        private Region region;
037        private ProducerState producerState;
038        private boolean mutable = true;
039        private long lastSendSequenceNumber = -1;
040        
041        public ProducerBrokerExchange() {
042        }
043    
044        public ProducerBrokerExchange copy() {
045            ProducerBrokerExchange rc = new ProducerBrokerExchange();
046            rc.connectionContext = connectionContext.copy();
047            rc.regionDestination = regionDestination;
048            rc.region = region;
049            rc.producerState = producerState;
050            rc.mutable = mutable;
051            return rc;
052        }
053    
054        
055        /**
056         * @return the connectionContext
057         */
058        public ConnectionContext getConnectionContext() {
059            return this.connectionContext;
060        }
061    
062        /**
063         * @param connectionContext the connectionContext to set
064         */
065        public void setConnectionContext(ConnectionContext connectionContext) {
066            this.connectionContext = connectionContext;
067        }
068    
069        /**
070         * @return the mutable
071         */
072        public boolean isMutable() {
073            return this.mutable;
074        }
075    
076        /**
077         * @param mutable the mutable to set
078         */
079        public void setMutable(boolean mutable) {
080            this.mutable = mutable;
081        }
082    
083        /**
084         * @return the regionDestination
085         */
086        public Destination getRegionDestination() {
087            return this.regionDestination;
088        }
089    
090        /**
091         * @param regionDestination the regionDestination to set
092         */
093        public void setRegionDestination(Destination regionDestination) {
094            this.regionDestination = regionDestination;
095        }
096    
097        /**
098         * @return the region
099         */
100        public Region getRegion() {
101            return this.region;
102        }
103    
104        /**
105         * @param region the region to set
106         */
107        public void setRegion(Region region) {
108            this.region = region;
109        }
110    
111        /**
112         * @return the producerState
113         */
114        public ProducerState getProducerState() {
115            return this.producerState;
116        }
117    
118        /**
119         * @param producerState the producerState to set
120         */
121        public void setProducerState(ProducerState producerState) {
122            this.producerState = producerState;
123        }
124    
125        /**
126         * Enforce duplicate suppression using info from persistence adapter
127         * @param messageSend
128         * @return false if message should be ignored as a duplicate
129         */
130        public boolean canDispatch(Message messageSend) {
131            boolean canDispatch = true;
132            if (lastSendSequenceNumber > 0) {
133                if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) {
134                    canDispatch = false;
135                    LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" 
136                            + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: "  + lastSendSequenceNumber);
137                }
138            }
139            return canDispatch;
140        }
141    
142        public void setLastStoredSequenceId(long l) {
143            lastSendSequenceNumber = l;
144            LOG.debug("last stored sequence id set: " + l);
145        }
146    }