001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.IOException;
020    import java.io.OutputStream;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.jms.InvalidDestinationException;
026    import javax.jms.JMSException;
027    
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.MessageId;
032    import org.apache.activemq.command.ProducerId;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.util.IOExceptionSupport;
035    
036    /**
037     * 
038     */
039    public class ActiveMQOutputStream extends OutputStream implements Disposable {
040    
041        protected int count;
042    
043        final byte buffer[];
044    
045        private final ActiveMQConnection connection;
046        private final Map<String, Object> properties;
047        private final ProducerInfo info;
048    
049        private long messageSequence;
050        private boolean closed;
051        private final int deliveryMode;
052        private final int priority;
053        private final long timeToLive;
054    
055        /**
056         * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
057         */
058        public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
059    
060        public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
061                                    long timeToLive) throws JMSException {
062            this.connection = connection;
063            this.deliveryMode = deliveryMode;
064            this.priority = priority;
065            this.timeToLive = timeToLive;
066            this.properties = properties == null ? null : new HashMap<String, Object>(properties);
067    
068            Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
069            if (chunkSize == null) {
070                chunkSize = 64 * 1024;
071            } else {
072                if (chunkSize < 1) {
073                    throw new IllegalArgumentException("Chunk size must be greater then 0");
074                } else {
075                    chunkSize *= 1024;
076                }
077            }
078    
079            buffer = new byte[chunkSize];
080    
081            if (destination == null) {
082                throw new InvalidDestinationException("Don't understand null destinations");
083            }
084    
085            this.info = new ProducerInfo(producerId);
086            this.info.setDestination(destination);
087    
088            this.connection.addOutputStream(this);
089            this.connection.asyncSendPacket(info);
090        }
091    
092        public void close() throws IOException {
093            if (!closed) {
094                flushBuffer();
095                try {
096                    // Send an EOS style empty message to signal EOS.
097                    send(new ActiveMQMessage(), true);
098                    dispose();
099                    this.connection.asyncSendPacket(info.createRemoveCommand());
100                } catch (JMSException e) {
101                    IOExceptionSupport.create(e);
102                }
103            }
104        }
105    
106        public void dispose() {
107            if (!closed) {
108                this.connection.removeOutputStream(this);
109                closed = true;
110            }
111        }
112    
113        public synchronized void write(int b) throws IOException {
114            buffer[count++] = (byte) b;
115            if (count == buffer.length) {
116                flushBuffer();
117            }
118        }
119    
120        public synchronized void write(byte b[], int off, int len) throws IOException {
121            while (len > 0) {
122                int max = Math.min(len, buffer.length - count);
123                System.arraycopy(b, off, buffer, count, max);
124    
125                len -= max;
126                count += max;
127                off += max;
128    
129                if (count == buffer.length) {
130                    flushBuffer();
131                }
132            }
133        }
134    
135        public synchronized void flush() throws IOException {
136            flushBuffer();
137        }
138    
139        private void flushBuffer() throws IOException {
140            if (count != 0) {
141                try {
142                    ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
143                    msg.writeBytes(buffer, 0, count);
144                    send(msg, false);
145                } catch (JMSException e) {
146                    throw IOExceptionSupport.create(e);
147                }
148                count = 0;
149            }
150        }
151    
152        /**
153         * @param msg
154         * @throws JMSException
155         */
156        private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
157            if (properties != null) {
158                for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
159                    String key = (String) iter.next();
160                    Object value = properties.get(key);
161                    msg.setObjectProperty(key, value);
162                }
163            }
164            msg.setType("org.apache.activemq.Stream");
165            msg.setGroupID(info.getProducerId().toString());
166            if (eosMessage) {
167                msg.setGroupSequence(-1);
168            } else {
169                msg.setGroupSequence((int) messageSequence);
170            }
171            MessageId id = new MessageId(info.getProducerId(), messageSequence++);
172            connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage);
173        }
174    
175        public String toString() {
176            return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
177        }
178    
179    }