001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025package org.kepler.hadoop.io.input; 026 027import java.io.IOException; 028import java.io.InputStream; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.io.Text; 032import org.kepler.ddp.Utilities; 033 034/** 035 * A class that provides data segments from an input stream, where the data segments are 036 * delimited by parameter "KEPLER::parameter::Delimiter". 037 * The class is modified based on class org.apache.hadoop.util.LineReader. 038 * 039 * @author Jianwu Wang (jianwu@sdsc.edu) 040 * @version $Id: DelimitedTextFileReader.java 33070 2014-11-12 23:21:09Z crawl $ 041*/ 042 043public class DelimitedTextFileReader { 044 private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; 045 private int bufferSize = DEFAULT_BUFFER_SIZE; 046 private InputStream in; 047 private byte[] buffer; 048 // the number of bytes of real data in the buffer 049 private int bufferLength = 0; 050 // the current position in the buffer 051 private int bufferPosn = 0; 052 private static final String DELIMITER_PARA = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::Delimiter"; 053 private byte[] delimiter = {'\n'}; 054 055 /** 056 * Create a line reader that reads from the given stream using the default 057 * buffer-size (64k). 058 * 059 * @param in 060 * The input stream 061 * @throws IOException 062 */ 063 public DelimitedTextFileReader(InputStream in) { 064 this(in, DEFAULT_BUFFER_SIZE); 065 } 066 067 /** 068 * Create a line reader that reads from the given stream using the given 069 * buffer-size. 070 * 071 * @param in 072 * The input stream 073 * @param bufferSize 074 * Size of the read buffer 075 * @throws IOException 076 */ 077 public DelimitedTextFileReader(InputStream in, int bufferSize) { 078 this.in = in; 079 this.bufferSize = bufferSize; 080 this.buffer = new byte[this.bufferSize]; 081// System.out.println("bufferSize in FASTAFileReader: "+ this.bufferSize); 082 Configuration conf = new Configuration(); 083 final String delimiterString = conf.get(DELIMITER_PARA, "\n"); 084 this.delimiter = delimiterString.getBytes(); 085 System.out.println("delimiter in DelimitedTextFileReader:" + delimiterString); 086 } 087 088 /** 089 * Create a line reader that reads from the given stream using the 090 * <code>io.file.buffer.size</code> specified in the given 091 * <code>Configuration</code>. 092 * 093 * @param in 094 * input stream 095 * @param conf 096 * configuration 097 * @throws IOException 098 */ 099 public DelimitedTextFileReader(InputStream in, Configuration conf) 100 throws IOException { 101 //this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); 102 this.in = in; 103 this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); 104 this.buffer = new byte[this.bufferSize]; 105 final String delimiterString = conf.get(DELIMITER_PARA, "\n"); 106 this.delimiter = delimiterString.getBytes(); 107 System.out.println("delimiter in DelimitedTextFileReader:" + delimiterString); 108 } 109 110 /** 111 * Close the underlying stream. 112 * 113 * @throws IOException 114 */ 115 public void close() throws IOException { 116 in.close(); 117 } 118 119 /** 120 * Read one line from the InputStream into the given Text. A line can be 121 * terminated by delimiter. EOF also terminates an otherwise unterminated line. 122 * 123 * @param str 124 * the object to store the given line (without newline) 125 * @param maxLineLength 126 * the maximum number of bytes to store into str; the rest of the 127 * line is silently discarded. 128 * @param maxBytesToConsume 129 * the maximum number of bytes to consume in this call. This is 130 * only a hint, because if the line cross this threshold, we 131 * allow it to happen. It can overshoot potentially by as much as 132 * one buffer length. 133 * 134 * @return the number of bytes read including the (longest) newline found. 135 * 136 * @throws IOException 137 * if the underlying stream throws 138 */ 139 public int readLine(Text str, int maxLineLength, int maxBytesToConsume) 140 throws IOException { 141 142// System.out.println("lineNumber FASTAFileReader:" + lineNumber); 143 str.clear(); 144 int txtLength = 0; // tracks str.getLength(), as an optimization 145 int newlineLength = 0; // length of terminating newline 146 long bytesConsumed = 0; 147 do { 148 int startPosn = bufferPosn; // starting from where we left off the 149 // last time 150 if (bufferPosn >= bufferLength) { 151 startPosn = bufferPosn = 0; 152 bufferLength = in.read(buffer); 153 if (bufferLength <= 0) 154 break; // EOF 155 } 156 157 /* position of matching positions in the delimiter byte array */ 158 int j = 0; 159 160 for (; bufferPosn < bufferLength && j < delimiter.length; ++bufferPosn) { // search for new line 161 if (buffer[bufferPosn] == delimiter[j]) { 162 j++; 163 } 164 else { 165 j = 0; 166 } 167 } 168 169 // check why we dropped out 170 if (j == delimiter.length) { 171 // line end 172 int readLength = bufferPosn - startPosn; 173 readLength = readLength - delimiter.length; // move position back for one byte to exclude the delimiter bytes at the end of the array. 174 bytesConsumed += readLength; 175 int appendLength = readLength - newlineLength; 176 if (appendLength > maxLineLength - txtLength) { 177 appendLength = maxLineLength - txtLength; 178 } 179 if (appendLength > 0) { 180 str.append(buffer, startPosn, appendLength); 181 // txtLength never used again 182 //txtLength += appendLength; 183 } 184 return (int) bytesConsumed; 185 } else { 186 // reach bufferLength 187 int readLength = bufferPosn - startPosn; 188 bytesConsumed += readLength; 189 int appendLength = readLength - newlineLength; 190 if (appendLength > maxLineLength - txtLength) { 191 appendLength = maxLineLength - txtLength; 192 } 193 if (appendLength > 0) { 194 str.append(buffer, startPosn, appendLength); 195 txtLength += appendLength; 196 } 197 } 198 } while (bytesConsumed < maxBytesToConsume); 199 if (bytesConsumed > (long) Integer.MAX_VALUE) 200 throw new IOException("Too many bytes before newline: " 201 + bytesConsumed); 202// System.out.println("data for one readline at FASTAFileReader:" + 203// str.toString()); 204// System.out.println("line number at FASTAFileReader:" + 205// lineNumberIndex); 206 return (int) bytesConsumed; 207 } 208 209 /** 210 * Read from the InputStream into the given Text. 211 * 212 * @param str 213 * the object to store the given line 214 * @param maxLineLength 215 * the maximum number of bytes to store into str. 216 * @return the number of bytes read including the newline 217 * @throws IOException 218 * if the underlying stream throws 219 */ 220 public int readLine(Text str, int maxLineLength) throws IOException { 221 return readLine(str, maxLineLength, Integer.MAX_VALUE); 222 } 223 224 /** 225 * Read from the InputStream into the given Text. 226 * 227 * @param str 228 * the object to store the given line 229 * @return the number of bytes read including the newline 230 * @throws IOException 231 * if the underlying stream throws 232 */ 233 public int readLine(Text str) throws IOException { 234 return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); 235 } 236 237}