001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import java.util.Set;
020    import javax.annotation.PostConstruct;
021    import org.apache.activemq.broker.BrokerPluginSupport;
022    import org.apache.activemq.broker.Connection;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.broker.ConsumerBrokerExchange;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.region.Destination;
027    import org.apache.activemq.broker.region.MessageReference;
028    import org.apache.activemq.broker.region.Subscription;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.BrokerInfo;
031    import org.apache.activemq.command.ConnectionInfo;
032    import org.apache.activemq.command.ConsumerInfo;
033    import org.apache.activemq.command.DestinationInfo;
034    import org.apache.activemq.command.Message;
035    import org.apache.activemq.command.MessageAck;
036    import org.apache.activemq.command.MessageDispatch;
037    import org.apache.activemq.command.MessageDispatchNotification;
038    import org.apache.activemq.command.MessagePull;
039    import org.apache.activemq.command.ProducerInfo;
040    import org.apache.activemq.command.RemoveSubscriptionInfo;
041    import org.apache.activemq.command.Response;
042    import org.apache.activemq.command.SessionInfo;
043    import org.apache.activemq.command.TransactionId;
044    import org.apache.activemq.usage.Usage;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A simple Broker intercepter which allows you to enable/disable logging.
050     * 
051     * @org.apache.xbean.XBean
052     */
053    
054    public class LoggingBrokerPlugin extends BrokerPluginSupport {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(LoggingBrokerPlugin.class);
057    
058        private boolean logAll = false;
059        private boolean logMessageEvents = false;
060        private boolean logConnectionEvents = true;
061        private boolean logTransactionEvents = false;
062        private boolean logConsumerEvents = false;
063        private boolean logProducerEvents = false;
064        private boolean logInternalEvents = false;
065    
066        /**
067         * 
068         * @throws Exception
069         * @org.apache.xbean.InitMethod
070         */
071        @PostConstruct
072        public void afterPropertiesSet() throws Exception {
073            LOG.info("Created LoggingBrokerPlugin: " + this.toString());
074        }
075    
076        public boolean isLogAll() {
077            return logAll;
078        }
079    
080        /**
081         * Logger all Events that go through the Plugin
082         */
083        public void setLogAll(boolean logAll) {
084            this.logAll = logAll;
085        }
086    
087        public boolean isLogMessageEvents() {
088            return logMessageEvents;
089        }
090    
091        /**
092         * Logger Events that are related to message processing
093         */
094        public void setLogMessageEvents(boolean logMessageEvents) {
095            this.logMessageEvents = logMessageEvents;
096        }
097    
098        public boolean isLogConnectionEvents() {
099            return logConnectionEvents;
100        }
101    
102        /**
103         * Logger Events that are related to connections and sessions
104         */
105        public void setLogConnectionEvents(boolean logConnectionEvents) {
106            this.logConnectionEvents = logConnectionEvents;
107        }
108    
109        public boolean isLogTransactionEvents() {
110            return logTransactionEvents;
111        }
112    
113        /**
114         * Logger Events that are related to transaction processing
115         */
116        public void setLogTransactionEvents(boolean logTransactionEvents) {
117            this.logTransactionEvents = logTransactionEvents;
118        }
119    
120        public boolean isLogConsumerEvents() {
121            return logConsumerEvents;
122        }
123    
124        /**
125         * Logger Events that are related to Consumers
126         */
127        public void setLogConsumerEvents(boolean logConsumerEvents) {
128            this.logConsumerEvents = logConsumerEvents;
129        }
130    
131        public boolean isLogProducerEvents() {
132            return logProducerEvents;
133        }
134    
135        /**
136         * Logger Events that are related to Producers
137         */
138        public void setLogProducerEvents(boolean logProducerEvents) {
139            this.logProducerEvents = logProducerEvents;
140        }
141    
142        public boolean isLogInternalEvents() {
143            return logInternalEvents;
144        }
145    
146        /**
147         * Logger Events that are normally internal to the broker
148         */
149        public void setLogInternalEvents(boolean logInternalEvents) {
150            this.logInternalEvents = logInternalEvents;
151        }
152    
153        @Override
154        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
155            if (isLogAll() || isLogConsumerEvents()) {
156                LOG.info("Acknowledging message for client ID : " + consumerExchange.getConnectionContext().getClientId()
157                        + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
158                if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
159                    LOG.trace("Message count: " + ack.getMessageCount() + ", First Message Id: " + ack.getFirstMessageId()
160                            + ", Last Message Id: " + ack.getLastMessageId());
161                }
162            }
163            super.acknowledge(consumerExchange, ack);
164        }
165    
166        @Override
167        public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
168            if (isLogAll() || isLogConsumerEvents()) {
169                LOG.info("Message Pull from : " + context.getClientId() + " on " + pull.getDestination().getPhysicalName());
170            }
171            return super.messagePull(context, pull);
172        }
173    
174        @Override
175        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
176            if (isLogAll() || isLogConnectionEvents()) {
177                LOG.info("Adding Connection : " + info);
178            }
179            super.addConnection(context, info);
180        }
181    
182        @Override
183        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
184            if (isLogAll() || isLogConsumerEvents()) {
185                LOG.info("Adding Consumer : " + info);
186            }
187            return super.addConsumer(context, info);
188        }
189    
190        @Override
191        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
192            if (isLogAll() || isLogProducerEvents()) {
193                LOG.info("Adding Producer :" + info);
194            }
195            super.addProducer(context, info);
196        }
197    
198        @Override
199        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
200            if (isLogAll() || isLogTransactionEvents()) {
201                LOG.info("Commiting transaction : " + xid.getTransactionKey());
202            }
203            super.commitTransaction(context, xid, onePhase);
204        }
205    
206        @Override
207        public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
208            if (isLogAll() || isLogConsumerEvents()) {
209                LOG.info("Removing subscription : " + info);
210            }
211            super.removeSubscription(context, info);
212        }
213    
214        @Override
215        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
216    
217            TransactionId[] result = super.getPreparedTransactions(context);
218            if ((isLogAll() || isLogTransactionEvents()) && result != null) {
219                StringBuffer tids = new StringBuffer();
220                for (TransactionId tid : result) {
221                    if (tids.length() > 0) {
222                        tids.append(", ");
223                    }
224                    tids.append(tid.getTransactionKey());
225                }
226                LOG.info("Prepared transactions : " + tids);
227            }
228            return result;
229        }
230    
231        @Override
232        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
233            if (isLogAll() || isLogTransactionEvents()) {
234                LOG.info("Preparing transaction : " + xid.getTransactionKey());
235            }
236            return super.prepareTransaction(context, xid);
237        }
238    
239        @Override
240        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
241            if (isLogAll() || isLogConnectionEvents()) {
242                LOG.info("Removing Connection : " + info);
243            }
244            super.removeConnection(context, info, error);
245        }
246    
247        @Override
248        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
249            if (isLogAll() || isLogConsumerEvents()) {
250                LOG.info("Removing Consumer : " + info);
251            }
252            super.removeConsumer(context, info);
253        }
254    
255        @Override
256        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
257            if (isLogAll() || isLogProducerEvents()) {
258                LOG.info("Removing Producer : " + info);
259            }
260            super.removeProducer(context, info);
261        }
262    
263        @Override
264        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
265            if (isLogAll() || isLogTransactionEvents()) {
266                LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
267            }
268            super.rollbackTransaction(context, xid);
269        }
270    
271        @Override
272        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
273            if (isLogAll() || isLogProducerEvents()) {
274                LOG.info("Sending message : " + messageSend.copy());
275            }
276            super.send(producerExchange, messageSend);
277        }
278    
279        @Override
280        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
281            if (isLogAll() || isLogTransactionEvents()) {
282                LOG.info("Beginning transaction : " + xid.getTransactionKey());
283            }
284            super.beginTransaction(context, xid);
285        }
286    
287        @Override
288        public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
289            if (isLogAll() || isLogTransactionEvents()) {
290                LOG.info("Forgetting transaction : " + transactionId.getTransactionKey());
291            }
292            super.forgetTransaction(context, transactionId);
293        }
294    
295        @Override
296        public Connection[] getClients() throws Exception {
297            Connection[] result = super.getClients();
298    
299            if (isLogAll() || isLogInternalEvents()) {
300                if (result == null) {
301                    LOG.info("Get Clients returned empty list.");
302                } else {
303                    StringBuffer cids = new StringBuffer();
304                    for (Connection c : result) {
305                        cids.append(cids.length() > 0 ? ", " : "");
306                        cids.append(c.getConnectionId());
307                    }
308                    LOG.info("Connected clients : " + cids);
309                }
310            }
311            return super.getClients();
312        }
313    
314        @Override
315        public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context,
316                ActiveMQDestination destination, boolean create) throws Exception {
317            if (isLogAll() || isLogInternalEvents()) {
318                LOG.info("Adding destination : " + destination.getDestinationTypeAsString() + ":"
319                        + destination.getPhysicalName());
320            }
321            return super.addDestination(context, destination, create);
322        }
323    
324        @Override
325        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
326                throws Exception {
327            if (isLogAll() || isLogInternalEvents()) {
328                LOG.info("Removing destination : " + destination.getDestinationTypeAsString() + ":"
329                        + destination.getPhysicalName());
330            }
331            super.removeDestination(context, destination, timeout);
332        }
333    
334        @Override
335        public ActiveMQDestination[] getDestinations() throws Exception {
336            ActiveMQDestination[] result = super.getDestinations();
337            if (isLogAll() || isLogInternalEvents()) {
338                if (result == null) {
339                    LOG.info("Get Destinations returned empty list.");
340                } else {
341                    StringBuffer destinations = new StringBuffer();
342                    for (ActiveMQDestination dest : result) {
343                        destinations.append(destinations.length() > 0 ? ", " : "");
344                        destinations.append(dest.getPhysicalName());
345                    }
346                    LOG.info("Get Destinations : " + destinations);
347                }
348            }
349            return result;
350        }
351    
352        @Override
353        public void start() throws Exception {
354            if (isLogAll() || isLogInternalEvents()) {
355                LOG.info("Starting " + getBrokerName());
356            }
357            super.start();
358        }
359    
360        @Override
361        public void stop() throws Exception {
362            if (isLogAll() || isLogInternalEvents()) {
363                LOG.info("Stopping " + getBrokerName());
364            }
365            super.stop();
366        }
367    
368        @Override
369        public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
370            if (isLogAll() || isLogConnectionEvents()) {
371                LOG.info("Adding Session : " + info);
372            }
373            super.addSession(context, info);
374        }
375    
376        @Override
377        public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
378            if (isLogAll() || isLogConnectionEvents()) {
379                LOG.info("Removing Session : " + info);
380            }
381            super.removeSession(context, info);
382        }
383    
384        @Override
385        public void addBroker(Connection connection, BrokerInfo info) {
386            if (isLogAll() || isLogInternalEvents()) {
387                LOG.info("Adding Broker " + info.getBrokerName());
388            }
389            super.addBroker(connection, info);
390        }
391    
392        @Override
393        public void removeBroker(Connection connection, BrokerInfo info) {
394            if (isLogAll() || isLogInternalEvents()) {
395                LOG.info("Removing Broker " + info.getBrokerName());
396            }
397            super.removeBroker(connection, info);
398        }
399    
400        @Override
401        public BrokerInfo[] getPeerBrokerInfos() {
402            BrokerInfo[] result = super.getPeerBrokerInfos();
403            if (isLogAll() || isLogInternalEvents()) {
404                if (result == null) {
405                    LOG.info("Get Peer Broker Infos returned empty list.");
406                } else {
407                    StringBuffer peers = new StringBuffer();
408                    for (BrokerInfo bi : result) {
409                        peers.append(peers.length() > 0 ? ", " : "");
410                        peers.append(bi.getBrokerName());
411                    }
412                    LOG.info("Get Peer Broker Infos : " + peers);
413                }
414            }
415            return result;
416        }
417    
418        @Override
419        public void preProcessDispatch(MessageDispatch messageDispatch) {
420            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
421                LOG.info("preProcessDispatch :" + messageDispatch);
422            }
423            super.preProcessDispatch(messageDispatch);
424        }
425    
426        @Override
427        public void postProcessDispatch(MessageDispatch messageDispatch) {
428            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
429                LOG.info("postProcessDispatch :" + messageDispatch);
430            }
431            super.postProcessDispatch(messageDispatch);
432        }
433    
434        @Override
435        public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
436            if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
437                LOG.info("ProcessDispatchNotification :" + messageDispatchNotification);
438            }
439            super.processDispatchNotification(messageDispatchNotification);
440        }
441    
442        @Override
443        public Set<ActiveMQDestination> getDurableDestinations() {
444            Set<ActiveMQDestination> result = super.getDurableDestinations();
445            if (isLogAll() || isLogInternalEvents()) {
446                if (result == null) {
447                    LOG.info("Get Durable Destinations returned empty list.");
448                } else {
449                    StringBuffer destinations = new StringBuffer();
450                    for (ActiveMQDestination dest : result) {
451                        destinations.append(destinations.length() > 0 ? ", " : "");
452                        destinations.append(dest.getPhysicalName());
453                    }
454                    LOG.info("Get Durable Destinations : " + destinations);
455                }
456            }
457            return result;
458        }
459    
460        @Override
461        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
462            if (isLogAll() || isLogInternalEvents()) {
463                LOG.info("Adding destination info : " + info);
464            }
465            super.addDestinationInfo(context, info);
466        }
467    
468        @Override
469        public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
470            if (isLogAll() || isLogInternalEvents()) {
471                LOG.info("Removing destination info : " + info);
472            }
473            super.removeDestinationInfo(context, info);
474        }
475    
476        @Override
477        public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
478            if (isLogAll() || isLogInternalEvents()) {
479                String msg = "Unable to display message.";
480    
481                msg = message.getMessage().toString();
482    
483                LOG.info("Message has expired : " + msg);
484            }
485            super.messageExpired(context, message, subscription);
486        }
487    
488        @Override
489        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
490                                          Subscription subscription) {
491            if (isLogAll() || isLogInternalEvents()) {
492                String msg = "Unable to display message.";
493    
494                msg = messageReference.getMessage().toString();
495    
496                LOG.info("Sending to DLQ : " + msg);
497            }
498            super.sendToDeadLetterQueue(context, messageReference, subscription);
499        }
500    
501        @Override
502        public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
503            if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
504                LOG.info("Fast Producer : " + producerInfo);
505            }
506            super.fastProducer(context, producerInfo);
507        }
508    
509        @Override
510        public void isFull(ConnectionContext context, Destination destination, Usage usage) {
511            if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
512                LOG.info("Destination is full : " + destination.getName());
513            }
514            super.isFull(context, destination, usage);
515        }
516    
517        @Override
518        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
519            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
520                String msg = "Unable to display message.";
521    
522                msg = messageReference.getMessage().toString();
523    
524                LOG.info("Message consumed : " + msg);
525            }
526            super.messageConsumed(context, messageReference);
527        }
528    
529        @Override
530        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
531            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
532                String msg = "Unable to display message.";
533    
534                msg = messageReference.getMessage().toString();
535    
536                LOG.info("Message delivered : " + msg);
537            }
538            super.messageDelivered(context, messageReference);
539        }
540    
541        @Override
542        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
543            if (isLogAll() || isLogInternalEvents()) {
544                String msg = "Unable to display message.";
545    
546                msg = messageReference.getMessage().toString();
547    
548                LOG.info("Message discarded : " + msg);
549            }
550            super.messageDiscarded(context, sub, messageReference);
551        }
552    
553        @Override
554        public void slowConsumer(ConnectionContext context, Destination destination, Subscription subs) {
555            if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
556                LOG.info("Detected slow consumer on " + destination.getName());
557                StringBuffer buf = new StringBuffer("Connection(");
558                buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
559                buf.append(") Session(");
560                buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
561                buf.append(")");
562                LOG.info(buf.toString());
563            }
564            super.slowConsumer(context, destination, subs);
565        }
566    
567        @Override
568        public void nowMasterBroker() {
569            if (isLogAll() || isLogInternalEvents()) {
570                LOG.info("Is now the master broker : " + getBrokerName());
571            }
572            super.nowMasterBroker();
573        }
574    
575        @Override
576        public String toString() {
577            StringBuffer buf = new StringBuffer();
578            buf.append("LoggingBrokerPlugin(");
579            buf.append("logAll=");
580            buf.append(isLogAll());
581            buf.append(", logConnectionEvents=");
582            buf.append(isLogConnectionEvents());
583            buf.append(", logConsumerEvents=");
584            buf.append(isLogConsumerEvents());
585            buf.append(", logProducerEvents=");
586            buf.append(isLogProducerEvents());
587            buf.append(", logMessageEvents=");
588            buf.append(isLogMessageEvents());
589            buf.append(", logTransactionEvents=");
590            buf.append(isLogTransactionEvents());
591            buf.append(", logInternalEvents=");
592            buf.append(isLogInternalEvents());
593            buf.append(")");
594            return buf.toString();
595        }
596    }