001/* FASTAFileReader class to read multiple sequences in each map function.
002
003/*
004 * Copyright (c) 2010-2013 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.bio.hadoop.input;
028
029import java.io.IOException;
030import java.io.InputStream;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.io.Text;
034import org.kepler.ddp.Utilities;
035
036//////////////////////////////////////////////////////////////////////////
037////FASTAFileReader
038
039/**
040 * A class that provides a multiple sequence reader from an input stream. The sequence number for each reader is configurable 
041 * through KEPLER::parameter::SequenceNumberPerExecution parameter which can be set in DDPDataSource actor.
042 * The class is modified based on class org.apache.hadoop.util.LineReader.
043 * 
044 * @author Jianwu Wang (jianwu@sdsc.edu)
045 * @version $Id: FASTAFileReader.java 33064 2014-11-12 23:17:08Z crawl $
046*/
047
048public class FASTAFileReader {
049        private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
050        private static final String SEQUENCE_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::SequenceNumberPerExecution";
051        private int bufferSize = DEFAULT_BUFFER_SIZE;
052        private InputStream in;
053        private byte[] buffer;
054        // the number of bytes of real data in the buffer
055        private int bufferLength = 0;
056        // the current position in the buffer
057        private int bufferPosn = 0;
058
059//      private static final byte CR = '\r';
060//      private static final byte LF = '\n';
061        private static final byte SEQ_DELIMITER = '>';
062
063        private int seqNumber = 1;
064
065        /**
066         * Create a line reader that reads from the given stream using the default
067         * buffer-size (64k).
068         * 
069         * @param in
070         *            The input stream
071         * @throws IOException
072         */
073        public FASTAFileReader(InputStream in) {
074                this(in, DEFAULT_BUFFER_SIZE);
075        }
076
077        /**
078         * Create a line reader that reads from the given stream using the given
079         * buffer-size.
080         * 
081         * @param in
082         *            The input stream
083         * @param bufferSize
084         *            Size of the read buffer
085         * @throws IOException
086         */
087        public FASTAFileReader(InputStream in, int bufferSize) {
088                this.in = in;
089                this.bufferSize = bufferSize;
090                this.buffer = new byte[this.bufferSize];
091//              System.out.println("bufferSize in FASTAFileReader: "+ this.bufferSize);
092                Configuration conf = new Configuration();
093                this.seqNumber = conf.getInt(SEQUENCE_NUMBER, 300);
094                System.out.println("SEQUENCE_NUM:"+this.seqNumber);
095                if (this.seqNumber == -1)
096                        this.seqNumber = bufferSize;
097                System.out.println("parameter SequenceNumberPerExecution in FASTAFileReader: "+ this.seqNumber);
098        }
099
100        /**
101         * Create a line reader that reads from the given stream using the
102         * <code>io.file.buffer.size</code> specified in the given
103         * <code>Configuration</code>.
104         * 
105         * @param in
106         *            input stream
107         * @param conf
108         *            configuration
109         * @throws IOException
110         */
111        public FASTAFileReader(InputStream in, Configuration conf)
112                        throws IOException {
113                //this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
114                this.in = in;
115                this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
116                this.buffer = new byte[this.bufferSize];
117                this.seqNumber = conf.getInt(SEQUENCE_NUMBER, 300);
118                System.out.println("SEQUENCE_NUM:"+this.seqNumber);
119                if (this.seqNumber == -1)
120                        this.seqNumber = bufferSize;
121                System.out.println("parameter SequenceNumberPerExecution in FASTAFileReader: "+ this.seqNumber);
122        }
123
124        /**
125         * Close the underlying stream.
126         * 
127         * @throws IOException
128         */
129        public void close() throws IOException {
130                in.close();
131        }
132
133        /**
134         * Read one line from the InputStream into the given Text. A line can be
135         * terminated by '>'. EOF also terminates an otherwise unterminated line.
136         * 
137         * @param str
138         *            the object to store the given line (without newline)
139         * @param maxBytesToConsume
140         *            the maximum number of bytes to consume in this call. This is
141         *            only a hint, because if the line cross this threshold, we
142         *            allow it to happen. It can overshoot potentially by as much as
143         *            one buffer length.
144         * @param seqNumber
145         *            sequence number for each key value record. It is read from 
146         *            configuration file if it is null.
147         * @return the number of bytes read including the (longest) newline found.
148         * 
149         * @throws IOException
150         *             if the underlying stream throws
151         */
152        public int readSequence(Text str, int maxBytesToConsume, int seqNumber)
153                        throws IOException {
154
155                if (seqNumber == -1)
156                        seqNumber = this.seqNumber;
157                str.clear();
158                int txtLength = 0; // tracks str.getLength(), as an optimization
159                int newlineLength = 0; // length of terminating newline
160                long bytesConsumed = 0;
161                int lineNumberIndex = 0;
162                do {
163                        int startPosn = bufferPosn; // starting from where we left off the
164                                                                                // last time
165                        if (bufferPosn >= bufferLength) {
166                                startPosn = bufferPosn = 0;
167                                bufferLength = in.read(buffer);
168                                if (bufferLength <= 0)
169                                        break; // EOF
170                        }
171                        
172                        for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
173                                                                                                                                // new sequence
174
175                                if (buffer[bufferPosn] == SEQ_DELIMITER && lineNumberIndex >= seqNumber) {
176                                        break;
177                                }
178                                if (buffer[bufferPosn] == SEQ_DELIMITER && bytesConsumed > maxBytesToConsume) {
179                                        break;
180                                }
181                                if (buffer[bufferPosn] == SEQ_DELIMITER) {
182                                        lineNumberIndex++;
183                                }
184                                bytesConsumed++;
185                        }
186                        int readLength = bufferPosn - startPosn;
187                        if (lineNumberIndex >= seqNumber)
188                                --readLength; // move position back for one byte to exclude the delimiter '>' of the next sequence
189                        //bytesConsumed += readLength;
190                        int appendLength = readLength - newlineLength;
191                        if (appendLength > 0) {
192                                str.append(buffer, startPosn, appendLength);
193                                txtLength += appendLength;
194                        }
195                        //bufferPosn == bufferLength, it means the current sequence has not been read completely, so continue the loop
196                } while ((lineNumberIndex < seqNumber || bufferPosn == bufferLength) && bytesConsumed <= maxBytesToConsume); 
197                if (bytesConsumed > (long) Integer.MAX_VALUE)
198                        throw new IOException("Too many bytes before newline: "
199                                        + bytesConsumed);       
200                return (int) bytesConsumed;
201        }
202
203        /**
204         * Read from the InputStream into the given Text.
205         * 
206         * @param str
207         *            the object to store the given line
208         * @return the number of bytes read including the newline
209         * @throws IOException
210         *             if the underlying stream throws
211         */
212        public int readSequence(Text str) throws IOException {
213                return readSequence(str, Integer.MAX_VALUE, -1);
214        }
215
216}