001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq.network; 018 019 import java.util.Set; 020 import java.util.concurrent.CopyOnWriteArraySet; 021 import java.util.concurrent.atomic.AtomicBoolean; 022 import java.util.concurrent.atomic.AtomicInteger; 023 024 import org.apache.activemq.command.ConsumerId; 025 import org.apache.activemq.command.ConsumerInfo; 026 import org.slf4j.Logger; 027 import org.slf4j.LoggerFactory; 028 029 /** 030 * Represents a network bridge interface 031 * 032 * 033 */ 034 public class DemandSubscription { 035 private static final Logger LOG = LoggerFactory.getLogger(DemandSubscription.class); 036 037 private final ConsumerInfo remoteInfo; 038 private final ConsumerInfo localInfo; 039 private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>(); 040 041 private AtomicInteger dispatched = new AtomicInteger(0); 042 private AtomicBoolean activeWaiter = new AtomicBoolean(); 043 044 DemandSubscription(ConsumerInfo info) { 045 remoteInfo = info; 046 localInfo = info.copy(); 047 localInfo.setNetworkSubscription(true); 048 remoteSubsIds.add(info.getConsumerId()); 049 } 050 051 /** 052 * Increment the consumers associated with this subscription 053 * 054 * @param id 055 * @return true if added 056 */ 057 public boolean add(ConsumerId id) { 058 return remoteSubsIds.add(id); 059 } 060 061 /** 062 * Increment the consumers associated with this subscription 063 * 064 * @param id 065 * @return true if removed 066 */ 067 public boolean remove(ConsumerId id) { 068 return remoteSubsIds.remove(id); 069 } 070 071 /** 072 * @return true if there are no interested consumers 073 */ 074 public boolean isEmpty() { 075 return remoteSubsIds.isEmpty(); 076 } 077 078 /** 079 * @return Returns the localInfo. 080 */ 081 public ConsumerInfo getLocalInfo() { 082 return localInfo; 083 } 084 085 /** 086 * @return Returns the remoteInfo. 087 */ 088 public ConsumerInfo getRemoteInfo() { 089 return remoteInfo; 090 } 091 092 public void waitForCompletion() { 093 if (dispatched.get() > 0) { 094 if (LOG.isDebugEnabled()) { 095 LOG.debug("Waiting for completion for sub: " + localInfo.getConsumerId() + ", dispatched: " + this.dispatched.get()); 096 } 097 activeWaiter.set(true); 098 if (dispatched.get() > 0) { 099 synchronized (activeWaiter) { 100 try { 101 activeWaiter.wait(); 102 } catch (InterruptedException ignored) { 103 } 104 } 105 if (this.dispatched.get() > 0) { 106 LOG.warn("demand sub interrupted or timedout while waiting for outstanding responses, expect potentially " + this.dispatched.get() + " duplicate deliveried"); 107 } 108 } 109 } 110 } 111 112 public void decrementOutstandingResponses() { 113 if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) { 114 synchronized (activeWaiter) { 115 activeWaiter.notifyAll(); 116 } 117 } 118 } 119 120 public boolean incrementOutstandingResponses() { 121 dispatched.incrementAndGet(); 122 if (activeWaiter.get()) { 123 decrementOutstandingResponses(); 124 return false; 125 } 126 return true; 127 } 128 }