001/* MultipleLineReader class to read multiple lines in each map function. 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.hadoop.io.input; 028 029import java.io.IOException; 030import java.io.InputStream; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.io.Text; 034 035////////////////////////////////////////////////////////////////////////// 036////MultipleLineReader 037 038/** 039 * A class that provides a multiple line reader from an input stream. The line number for each reader is configurable 040 * through 'org.kepler.hadoop.lineread.number' item at mapred-site.xml. 041 * The class is modified based on class org.apache.hadoop.util.LineReader. 042 * 043 * @author Jianwu Wang (jianwu@sdsc.edu) 044 * @version $Id: MultipleLineReader.java 33070 2014-11-12 23:21:09Z crawl $ 045*/ 046 047public class MultipleLineReader { 048 private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; 049 private int bufferSize = DEFAULT_BUFFER_SIZE; 050 private InputStream in; 051 private byte[] buffer; 052 // the number of bytes of real data in the buffer 053 private int bufferLength = 0; 054 // the current position in the buffer 055 private int bufferPosn = 0; 056 057 private static final byte CR = '\r'; 058 private static final byte LF = '\n'; 059 060 private int lineNumber = 1; 061 062 /** 063 * Create a line reader that reads from the given stream using the default 064 * buffer-size (64k). 065 * 066 * @param in 067 * The input stream 068 * @throws IOException 069 */ 070 public MultipleLineReader(InputStream in) { 071 this(in, DEFAULT_BUFFER_SIZE); 072 } 073 074 /** 075 * Create a line reader that reads from the given stream using the given 076 * buffer-size. 077 * 078 * @param in 079 * The input stream 080 * @param bufferSize 081 * Size of the read buffer 082 * @throws IOException 083 */ 084 public MultipleLineReader(InputStream in, int bufferSize) { 085 this.in = in; 086 this.bufferSize = bufferSize; 087 this.buffer = new byte[this.bufferSize]; 088// System.out.println("bufferSize in MultipleLineReader: "+ this.bufferSize); 089 Configuration conf = new Configuration(); 090 this.lineNumber = conf.getInt("org.kepler.hadoop.lineread.number", 5000); 091 if (this.lineNumber == -1) 092 this.lineNumber = bufferSize; 093 System.out.println("lineNumber in MultipleLineReader: "+ this.lineNumber); 094 } 095 096 /** 097 * Create a line reader that reads from the given stream using the 098 * <code>io.file.buffer.size</code> specified in the given 099 * <code>Configuration</code>. 100 * 101 * @param in 102 * input stream 103 * @param conf 104 * configuration 105 * @throws IOException 106 */ 107 public MultipleLineReader(InputStream in, Configuration conf) 108 throws IOException { 109 this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); 110 this.lineNumber = conf.getInt("org.kepler.hadoop.lineread.number", 5000); 111 if (this.lineNumber == -1) 112 this.lineNumber = bufferSize; 113 System.out.println("lineNumber in MultipleLineReader: "+ this.lineNumber); 114 } 115 116 /** 117 * Close the underlying stream. 118 * 119 * @throws IOException 120 */ 121 public void close() throws IOException { 122 in.close(); 123 } 124 125 /** 126 * Read one line from the InputStream into the given Text. A line can be 127 * terminated by one of the following: '\n' (LF) , '\r' (CR), or '\r\n' 128 * (CR+LF). EOF also terminates an otherwise unterminated line. 129 * 130 * @param str 131 * the object to store the given line (without newline) 132 * @param maxLineLength 133 * the maximum number of bytes to store into str; the rest of the 134 * line is silently discarded. 135 * @param maxBytesToConsume 136 * the maximum number of bytes to consume in this call. This is 137 * only a hint, because if the line cross this threshold, we 138 * allow it to happen. It can overshoot potentially by as much as 139 * one buffer length. 140 * 141 * @return the number of bytes read including the (longest) newline found. 142 * 143 * @throws IOException 144 * if the underlying stream throws 145 */ 146 public int readLine(Text str, int maxLineLength, int maxBytesToConsume) 147 throws IOException { 148 /* 149 * We're reading data from in, but the head of the stream may be already 150 * buffered in buffer, so we have several cases: 1. No newline 151 * characters are in the buffer, so we need to copy everything and read 152 * another buffer from the stream. 2. An unambiguously terminated line 153 * is in buffer, so we just copy to str. 3. Ambiguously terminated line 154 * is in buffer, i.e. buffer ends in CR. In this case we copy everything 155 * up to CR to str, but we also need to see what follows CR: if it's LF, 156 * then we need consume LF as well, so next call to readLine will read 157 * from after that. We use a flag prevCharCR to signal if previous 158 * character was CR and, if it happens to be at the end of the buffer, 159 * delay consuming it until we have a chance to look at the char that 160 * follows. 161 */ 162 163// System.out.println("lineNumber MultipleLineReader:" + lineNumber); 164 str.clear(); 165 int txtLength = 0; // tracks str.getLength(), as an optimization 166 int newlineLength = 0; // length of terminating newline 167 boolean prevCharCR = false; // true of prev char was CR 168 long bytesConsumed = 0; 169 int lineNumberIndex = 1; 170 do { 171 int startPosn = bufferPosn; // starting from where we left off the 172 // last time 173 if (bufferPosn >= bufferLength) { 174 startPosn = bufferPosn = 0; 175 if (prevCharCR) 176 ++bytesConsumed; // account for CR from previous read 177 bufferLength = in.read(buffer); 178 if (bufferLength <= 0) 179 break; // EOF 180 } 181 182 //lineNumberIndex = 1; 183 184 for (; bufferPosn < bufferLength; ++bufferPosn) { // search for 185 // newline 186 187 if (buffer[bufferPosn] == LF && lineNumberIndex >= lineNumber) { 188 newlineLength = (prevCharCR) ? 2 : 1; 189 ++bufferPosn; // at next invocation proceed from following 190 // byte 191 break; 192 } 193 if (prevCharCR && lineNumberIndex >= lineNumber) { // CR + 194 // notLF, we 195 // are at 196 // notLF 197 newlineLength = 1; 198 break; 199 } 200 prevCharCR = (buffer[bufferPosn] == CR); 201 if (buffer[bufferPosn] == LF || buffer[bufferPosn] == CR) { 202 lineNumberIndex++; 203 } 204 205 } 206 int readLength = bufferPosn - startPosn; 207 if (prevCharCR && newlineLength == 0) 208 --readLength; // CR at the end of the buffer 209 bytesConsumed += readLength; 210 int appendLength = readLength - newlineLength; 211 if (appendLength > maxLineLength - txtLength) { 212 appendLength = maxLineLength - txtLength; 213 } 214 if (appendLength > 0) { 215 str.append(buffer, startPosn, appendLength); 216 txtLength += appendLength; 217 } 218 } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); 219// System.out.println("data for one readline at MultipleLineReader:" + 220// str.toString()); 221// System.out.println("line number at MultipleLineReader:" + 222// lineNumberIndex); 223 if (bytesConsumed > (long) Integer.MAX_VALUE) 224 throw new IOException("Too many bytes before newline: " 225 + bytesConsumed); 226 return (int) bytesConsumed; 227 } 228 229 /** 230 * Read from the InputStream into the given Text. 231 * 232 * @param str 233 * the object to store the given line 234 * @param maxLineLength 235 * the maximum number of bytes to store into str. 236 * @return the number of bytes read including the newline 237 * @throws IOException 238 * if the underlying stream throws 239 */ 240 public int readLine(Text str, int maxLineLength) throws IOException { 241 return readLine(str, maxLineLength, Integer.MAX_VALUE); 242 } 243 244 /** 245 * Read from the InputStream into the given Text. 246 * 247 * @param str 248 * the object to store the given line 249 * @return the number of bytes read including the newline 250 * @throws IOException 251 * if the underlying stream throws 252 */ 253 public int readLine(Text str) throws IOException { 254 return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); 255 } 256 257}