001/* MultipleLineReader class to read multiple lines in each map function.
002
003/*
004 * Copyright (c) 2010-2013 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.hadoop.io.input;
028
029import java.io.IOException;
030import java.io.InputStream;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.io.Text;
034
035//////////////////////////////////////////////////////////////////////////
036////MultipleLineReader
037
038/**
039 * A class that provides a multiple line reader from an input stream. The line number for each reader is configurable 
040 * through 'org.kepler.hadoop.lineread.number' item at mapred-site.xml.
041 * The class is modified based on class org.apache.hadoop.util.LineReader.
042 * 
043 * @author Jianwu Wang (jianwu@sdsc.edu)
044 * @version $Id: MultipleLineReader.java 33070 2014-11-12 23:21:09Z crawl $
045*/
046
047public class MultipleLineReader {
048        private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
049        private int bufferSize = DEFAULT_BUFFER_SIZE;
050        private InputStream in;
051        private byte[] buffer;
052        // the number of bytes of real data in the buffer
053        private int bufferLength = 0;
054        // the current position in the buffer
055        private int bufferPosn = 0;
056
057        private static final byte CR = '\r';
058        private static final byte LF = '\n';
059
060        private int lineNumber = 1;
061
062        /**
063         * Create a line reader that reads from the given stream using the default
064         * buffer-size (64k).
065         * 
066         * @param in
067         *            The input stream
068         * @throws IOException
069         */
070        public MultipleLineReader(InputStream in) {
071                this(in, DEFAULT_BUFFER_SIZE);
072        }
073
074        /**
075         * Create a line reader that reads from the given stream using the given
076         * buffer-size.
077         * 
078         * @param in
079         *            The input stream
080         * @param bufferSize
081         *            Size of the read buffer
082         * @throws IOException
083         */
084        public MultipleLineReader(InputStream in, int bufferSize) {
085                this.in = in;
086                this.bufferSize = bufferSize;
087                this.buffer = new byte[this.bufferSize];
088//              System.out.println("bufferSize in MultipleLineReader: "+ this.bufferSize);
089                Configuration conf = new Configuration();
090                this.lineNumber = conf.getInt("org.kepler.hadoop.lineread.number", 5000);
091                if (this.lineNumber == -1)
092                        this.lineNumber = bufferSize;
093                System.out.println("lineNumber in MultipleLineReader: "+ this.lineNumber);
094        }
095
096        /**
097         * Create a line reader that reads from the given stream using the
098         * <code>io.file.buffer.size</code> specified in the given
099         * <code>Configuration</code>.
100         * 
101         * @param in
102         *            input stream
103         * @param conf
104         *            configuration
105         * @throws IOException
106         */
107        public MultipleLineReader(InputStream in, Configuration conf)
108                        throws IOException {
109                this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
110                this.lineNumber = conf.getInt("org.kepler.hadoop.lineread.number", 5000);
111                if (this.lineNumber == -1)
112                        this.lineNumber = bufferSize;
113                System.out.println("lineNumber in MultipleLineReader: "+ this.lineNumber);
114        }
115
116        /**
117         * Close the underlying stream.
118         * 
119         * @throws IOException
120         */
121        public void close() throws IOException {
122                in.close();
123        }
124
125        /**
126         * Read one line from the InputStream into the given Text. A line can be
127         * terminated by one of the following: '\n' (LF) , '\r' (CR), or '\r\n'
128         * (CR+LF). EOF also terminates an otherwise unterminated line.
129         * 
130         * @param str
131         *            the object to store the given line (without newline)
132         * @param maxLineLength
133         *            the maximum number of bytes to store into str; the rest of the
134         *            line is silently discarded.
135         * @param maxBytesToConsume
136         *            the maximum number of bytes to consume in this call. This is
137         *            only a hint, because if the line cross this threshold, we
138         *            allow it to happen. It can overshoot potentially by as much as
139         *            one buffer length.
140         * 
141         * @return the number of bytes read including the (longest) newline found.
142         * 
143         * @throws IOException
144         *             if the underlying stream throws
145         */
146        public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
147                        throws IOException {
148                /*
149                 * We're reading data from in, but the head of the stream may be already
150                 * buffered in buffer, so we have several cases: 1. No newline
151                 * characters are in the buffer, so we need to copy everything and read
152                 * another buffer from the stream. 2. An unambiguously terminated line
153                 * is in buffer, so we just copy to str. 3. Ambiguously terminated line
154                 * is in buffer, i.e. buffer ends in CR. In this case we copy everything
155                 * up to CR to str, but we also need to see what follows CR: if it's LF,
156                 * then we need consume LF as well, so next call to readLine will read
157                 * from after that. We use a flag prevCharCR to signal if previous
158                 * character was CR and, if it happens to be at the end of the buffer,
159                 * delay consuming it until we have a chance to look at the char that
160                 * follows.
161                 */
162
163//              System.out.println("lineNumber MultipleLineReader:" + lineNumber);
164                str.clear();
165                int txtLength = 0; // tracks str.getLength(), as an optimization
166                int newlineLength = 0; // length of terminating newline
167                boolean prevCharCR = false; // true of prev char was CR
168                long bytesConsumed = 0;
169                int lineNumberIndex = 1;
170                do {
171                        int startPosn = bufferPosn; // starting from where we left off the
172                                                                                // last time
173                        if (bufferPosn >= bufferLength) {
174                                startPosn = bufferPosn = 0;
175                                if (prevCharCR)
176                                        ++bytesConsumed; // account for CR from previous read
177                                bufferLength = in.read(buffer);
178                                if (bufferLength <= 0)
179                                        break; // EOF
180                        }
181                        
182                        //lineNumberIndex = 1;
183                        
184                        for (; bufferPosn < bufferLength; ++bufferPosn) { // search for
185                                                                                                                                // newline
186
187                                if (buffer[bufferPosn] == LF && lineNumberIndex >= lineNumber) {
188                                        newlineLength = (prevCharCR) ? 2 : 1;
189                                        ++bufferPosn; // at next invocation proceed from following
190                                                                        // byte
191                                        break;
192                                }
193                                if (prevCharCR && lineNumberIndex >= lineNumber) { // CR +
194                                                                                                                                        // notLF, we
195                                                                                                                                        // are at
196                                                                                                                                        // notLF
197                                        newlineLength = 1;
198                                        break;
199                                }
200                                prevCharCR = (buffer[bufferPosn] == CR);
201                                if (buffer[bufferPosn] == LF || buffer[bufferPosn] == CR) {
202                                        lineNumberIndex++;
203                                }
204
205                        }
206                        int readLength = bufferPosn - startPosn;
207                        if (prevCharCR && newlineLength == 0)
208                                --readLength; // CR at the end of the buffer
209                        bytesConsumed += readLength;
210                        int appendLength = readLength - newlineLength;
211                        if (appendLength > maxLineLength - txtLength) {
212                                appendLength = maxLineLength - txtLength;
213                        }
214                        if (appendLength > 0) {
215                                str.append(buffer, startPosn, appendLength);
216                                txtLength += appendLength;
217                        }
218                } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
219//              System.out.println("data for one readline at MultipleLineReader:" +
220//              str.toString());
221//              System.out.println("line number at MultipleLineReader:" +
222//                              lineNumberIndex);
223                if (bytesConsumed > (long) Integer.MAX_VALUE)
224                        throw new IOException("Too many bytes before newline: "
225                                        + bytesConsumed);
226                return (int) bytesConsumed;
227        }
228
229        /**
230         * Read from the InputStream into the given Text.
231         * 
232         * @param str
233         *            the object to store the given line
234         * @param maxLineLength
235         *            the maximum number of bytes to store into str.
236         * @return the number of bytes read including the newline
237         * @throws IOException
238         *             if the underlying stream throws
239         */
240        public int readLine(Text str, int maxLineLength) throws IOException {
241                return readLine(str, maxLineLength, Integer.MAX_VALUE);
242        }
243
244        /**
245         * Read from the InputStream into the given Text.
246         * 
247         * @param str
248         *            the object to store the given line
249         * @return the number of bytes read including the newline
250         * @throws IOException
251         *             if the underlying stream throws
252         */
253        public int readLine(Text str) throws IOException {
254                return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
255        }
256
257}