001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.activemq; 018 019 import java.io.IOException; 020 import java.io.OutputStream; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.jms.InvalidDestinationException; 026 import javax.jms.JMSException; 027 028 import org.apache.activemq.command.ActiveMQBytesMessage; 029 import org.apache.activemq.command.ActiveMQDestination; 030 import org.apache.activemq.command.ActiveMQMessage; 031 import org.apache.activemq.command.MessageId; 032 import org.apache.activemq.command.ProducerId; 033 import org.apache.activemq.command.ProducerInfo; 034 import org.apache.activemq.util.IOExceptionSupport; 035 036 /** 037 * 038 */ 039 public class ActiveMQOutputStream extends OutputStream implements Disposable { 040 041 protected int count; 042 043 final byte buffer[]; 044 045 private final ActiveMQConnection connection; 046 private final Map<String, Object> properties; 047 private final ProducerInfo info; 048 049 private long messageSequence; 050 private boolean closed; 051 private final int deliveryMode; 052 private final int priority; 053 private final long timeToLive; 054 055 /** 056 * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb 057 */ 058 public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE"; 059 060 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority, 061 long timeToLive) throws JMSException { 062 this.connection = connection; 063 this.deliveryMode = deliveryMode; 064 this.priority = priority; 065 this.timeToLive = timeToLive; 066 this.properties = properties == null ? null : new HashMap<String, Object>(properties); 067 068 Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE); 069 if (chunkSize == null) { 070 chunkSize = 64 * 1024; 071 } else { 072 if (chunkSize < 1) { 073 throw new IllegalArgumentException("Chunk size must be greater then 0"); 074 } else { 075 chunkSize *= 1024; 076 } 077 } 078 079 buffer = new byte[chunkSize]; 080 081 if (destination == null) { 082 throw new InvalidDestinationException("Don't understand null destinations"); 083 } 084 085 this.info = new ProducerInfo(producerId); 086 this.info.setDestination(destination); 087 088 this.connection.addOutputStream(this); 089 this.connection.asyncSendPacket(info); 090 } 091 092 public void close() throws IOException { 093 if (!closed) { 094 flushBuffer(); 095 try { 096 // Send an EOS style empty message to signal EOS. 097 send(new ActiveMQMessage(), true); 098 dispose(); 099 this.connection.asyncSendPacket(info.createRemoveCommand()); 100 } catch (JMSException e) { 101 IOExceptionSupport.create(e); 102 } 103 } 104 } 105 106 public void dispose() { 107 if (!closed) { 108 this.connection.removeOutputStream(this); 109 closed = true; 110 } 111 } 112 113 public synchronized void write(int b) throws IOException { 114 buffer[count++] = (byte) b; 115 if (count == buffer.length) { 116 flushBuffer(); 117 } 118 } 119 120 public synchronized void write(byte b[], int off, int len) throws IOException { 121 while (len > 0) { 122 int max = Math.min(len, buffer.length - count); 123 System.arraycopy(b, off, buffer, count, max); 124 125 len -= max; 126 count += max; 127 off += max; 128 129 if (count == buffer.length) { 130 flushBuffer(); 131 } 132 } 133 } 134 135 public synchronized void flush() throws IOException { 136 flushBuffer(); 137 } 138 139 private void flushBuffer() throws IOException { 140 if (count != 0) { 141 try { 142 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 143 msg.writeBytes(buffer, 0, count); 144 send(msg, false); 145 } catch (JMSException e) { 146 throw IOExceptionSupport.create(e); 147 } 148 count = 0; 149 } 150 } 151 152 /** 153 * @param msg 154 * @throws JMSException 155 */ 156 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException { 157 if (properties != null) { 158 for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) { 159 String key = (String) iter.next(); 160 Object value = properties.get(key); 161 msg.setObjectProperty(key, value); 162 } 163 } 164 msg.setType("org.apache.activemq.Stream"); 165 msg.setGroupID(info.getProducerId().toString()); 166 if (eosMessage) { 167 msg.setGroupSequence(-1); 168 } else { 169 msg.setGroupSequence((int) messageSequence); 170 } 171 MessageId id = new MessageId(info.getProducerId(), messageSequence++); 172 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage); 173 } 174 175 public String toString() { 176 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }"; 177 } 178 179 }