001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025package org.kepler.hadoop.io.input;
026
027import java.io.IOException;
028import java.io.InputStream;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.io.Text;
032import org.kepler.ddp.Utilities;
033
034/**
035 * A class that provides data segments from an input stream, where the data segments are
036 * delimited by parameter "KEPLER::parameter::Delimiter".
037 * The class is modified based on class org.apache.hadoop.util.LineReader.
038 * 
039 * @author Jianwu Wang (jianwu@sdsc.edu)
040 * @version $Id: DelimitedTextFileReader.java 33070 2014-11-12 23:21:09Z crawl $
041*/
042
043public class DelimitedTextFileReader {
044        private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
045        private int bufferSize = DEFAULT_BUFFER_SIZE;
046        private InputStream in;
047        private byte[] buffer;
048        // the number of bytes of real data in the buffer
049        private int bufferLength = 0;
050        // the current position in the buffer
051        private int bufferPosn = 0;
052        private static final String DELIMITER_PARA = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::Delimiter";
053        private byte[] delimiter = {'\n'};
054
055        /**
056         * Create a line reader that reads from the given stream using the default
057         * buffer-size (64k).
058         * 
059         * @param in
060         *            The input stream
061         * @throws IOException
062         */
063        public DelimitedTextFileReader(InputStream in) {
064                this(in, DEFAULT_BUFFER_SIZE);
065        }
066
067        /**
068         * Create a line reader that reads from the given stream using the given
069         * buffer-size.
070         * 
071         * @param in
072         *            The input stream
073         * @param bufferSize
074         *            Size of the read buffer
075         * @throws IOException
076         */
077        public DelimitedTextFileReader(InputStream in, int bufferSize) {
078                this.in = in;
079                this.bufferSize = bufferSize;
080                this.buffer = new byte[this.bufferSize];
081//              System.out.println("bufferSize in FASTAFileReader: "+ this.bufferSize);
082                Configuration conf = new Configuration();
083                final String delimiterString = conf.get(DELIMITER_PARA, "\n");
084                this.delimiter = delimiterString.getBytes();
085                System.out.println("delimiter in DelimitedTextFileReader:" + delimiterString);
086        }
087
088        /**
089         * Create a line reader that reads from the given stream using the
090         * <code>io.file.buffer.size</code> specified in the given
091         * <code>Configuration</code>.
092         * 
093         * @param in
094         *            input stream
095         * @param conf
096         *            configuration
097         * @throws IOException
098         */
099        public DelimitedTextFileReader(InputStream in, Configuration conf)
100                        throws IOException {
101                //this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
102                this.in = in;
103                this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
104                this.buffer = new byte[this.bufferSize];
105                final String delimiterString = conf.get(DELIMITER_PARA, "\n");
106                this.delimiter = delimiterString.getBytes();
107                System.out.println("delimiter in DelimitedTextFileReader:" + delimiterString);
108        }
109
110        /**
111         * Close the underlying stream.
112         * 
113         * @throws IOException
114         */
115        public void close() throws IOException {
116                in.close();
117        }
118
119        /**
120         * Read one line from the InputStream into the given Text. A line can be
121         * terminated by delimiter. EOF also terminates an otherwise unterminated line.
122         * 
123         * @param str
124         *            the object to store the given line (without newline)
125         * @param maxLineLength
126         *            the maximum number of bytes to store into str; the rest of the
127         *            line is silently discarded.
128         * @param maxBytesToConsume
129         *            the maximum number of bytes to consume in this call. This is
130         *            only a hint, because if the line cross this threshold, we
131         *            allow it to happen. It can overshoot potentially by as much as
132         *            one buffer length.
133         * 
134         * @return the number of bytes read including the (longest) newline found.
135         * 
136         * @throws IOException
137         *             if the underlying stream throws
138         */
139        public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
140                        throws IOException {
141
142//              System.out.println("lineNumber FASTAFileReader:" + lineNumber);
143                str.clear();
144                int txtLength = 0; // tracks str.getLength(), as an optimization
145                int newlineLength = 0; // length of terminating newline
146                long bytesConsumed = 0;
147                do {
148                        int startPosn = bufferPosn; // starting from where we left off the
149                                                                                // last time
150                        if (bufferPosn >= bufferLength) {
151                                startPosn = bufferPosn = 0;
152                                bufferLength = in.read(buffer);
153                                if (bufferLength <= 0)
154                                        break; // EOF
155                        }
156                        
157                        /* position of matching positions in the delimiter byte array */
158                        int j = 0;
159                        
160                        for (; bufferPosn < bufferLength && j < delimiter.length; ++bufferPosn) { // search for new line
161                                if (buffer[bufferPosn] == delimiter[j]) {
162                                        j++;
163                                }
164                                else {
165                                        j = 0;
166                                }
167                        }
168                        
169                        // check why we dropped out
170                        if (j == delimiter.length) {
171                                // line end
172                                int readLength = bufferPosn - startPosn;
173                                readLength = readLength - delimiter.length; // move position back for one byte to exclude the delimiter bytes at the end of the array.
174                                bytesConsumed += readLength;
175                                int appendLength = readLength - newlineLength;
176                                if (appendLength > maxLineLength - txtLength) {
177                                        appendLength = maxLineLength - txtLength;
178                                }
179                                if (appendLength > 0) {
180                                        str.append(buffer, startPosn, appendLength);
181                                        // txtLength never used again
182                                        //txtLength += appendLength;
183                                }
184                                return (int) bytesConsumed;
185                        } else {
186                                // reach bufferLength
187                                int readLength = bufferPosn - startPosn;
188                                bytesConsumed += readLength;
189                                int appendLength = readLength - newlineLength;
190                                if (appendLength > maxLineLength - txtLength) {
191                                        appendLength = maxLineLength - txtLength;
192                                }
193                                if (appendLength > 0) {
194                                        str.append(buffer, startPosn, appendLength);
195                                        txtLength += appendLength;
196                                }
197                        }
198                } while (bytesConsumed < maxBytesToConsume);
199                if (bytesConsumed > (long) Integer.MAX_VALUE)
200                        throw new IOException("Too many bytes before newline: "
201                                        + bytesConsumed);
202//              System.out.println("data for one readline at FASTAFileReader:" +
203//              str.toString());
204//              System.out.println("line number at FASTAFileReader:" +
205//                              lineNumberIndex);               
206                return (int) bytesConsumed;
207        }
208
209        /**
210         * Read from the InputStream into the given Text.
211         * 
212         * @param str
213         *            the object to store the given line
214         * @param maxLineLength
215         *            the maximum number of bytes to store into str.
216         * @return the number of bytes read including the newline
217         * @throws IOException
218         *             if the underlying stream throws
219         */
220        public int readLine(Text str, int maxLineLength) throws IOException {
221                return readLine(str, maxLineLength, Integer.MAX_VALUE);
222        }
223
224        /**
225         * Read from the InputStream into the given Text.
226         * 
227         * @param str
228         *            the object to store the given line
229         * @return the number of bytes read including the newline
230         * @throws IOException
231         *             if the underlying stream throws
232         */
233        public int readLine(Text str) throws IOException {
234                return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
235        }
236
237}