001/* FASTAFileReader class to read multiple sequences in each map function. 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.bio.hadoop.input; 028 029import java.io.IOException; 030import java.io.InputStream; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.io.Text; 034import org.kepler.ddp.Utilities; 035 036////////////////////////////////////////////////////////////////////////// 037////FASTAFileReader 038 039/** 040 * A class that provides a multiple sequence reader from an input stream. The sequence number for each reader is configurable 041 * through KEPLER::parameter::SequenceNumberPerExecution parameter which can be set in DDPDataSource actor. 042 * The class is modified based on class org.apache.hadoop.util.LineReader. 043 * 044 * @author Jianwu Wang (jianwu@sdsc.edu) 045 * @version $Id: FASTAFileReader.java 33064 2014-11-12 23:17:08Z crawl $ 046*/ 047 048public class FASTAFileReader { 049 private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; 050 private static final String SEQUENCE_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::SequenceNumberPerExecution"; 051 private int bufferSize = DEFAULT_BUFFER_SIZE; 052 private InputStream in; 053 private byte[] buffer; 054 // the number of bytes of real data in the buffer 055 private int bufferLength = 0; 056 // the current position in the buffer 057 private int bufferPosn = 0; 058 059// private static final byte CR = '\r'; 060// private static final byte LF = '\n'; 061 private static final byte SEQ_DELIMITER = '>'; 062 063 private int seqNumber = 1; 064 065 /** 066 * Create a line reader that reads from the given stream using the default 067 * buffer-size (64k). 068 * 069 * @param in 070 * The input stream 071 * @throws IOException 072 */ 073 public FASTAFileReader(InputStream in) { 074 this(in, DEFAULT_BUFFER_SIZE); 075 } 076 077 /** 078 * Create a line reader that reads from the given stream using the given 079 * buffer-size. 080 * 081 * @param in 082 * The input stream 083 * @param bufferSize 084 * Size of the read buffer 085 * @throws IOException 086 */ 087 public FASTAFileReader(InputStream in, int bufferSize) { 088 this.in = in; 089 this.bufferSize = bufferSize; 090 this.buffer = new byte[this.bufferSize]; 091// System.out.println("bufferSize in FASTAFileReader: "+ this.bufferSize); 092 Configuration conf = new Configuration(); 093 this.seqNumber = conf.getInt(SEQUENCE_NUMBER, 300); 094 System.out.println("SEQUENCE_NUM:"+this.seqNumber); 095 if (this.seqNumber == -1) 096 this.seqNumber = bufferSize; 097 System.out.println("parameter SequenceNumberPerExecution in FASTAFileReader: "+ this.seqNumber); 098 } 099 100 /** 101 * Create a line reader that reads from the given stream using the 102 * <code>io.file.buffer.size</code> specified in the given 103 * <code>Configuration</code>. 104 * 105 * @param in 106 * input stream 107 * @param conf 108 * configuration 109 * @throws IOException 110 */ 111 public FASTAFileReader(InputStream in, Configuration conf) 112 throws IOException { 113 //this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); 114 this.in = in; 115 this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); 116 this.buffer = new byte[this.bufferSize]; 117 this.seqNumber = conf.getInt(SEQUENCE_NUMBER, 300); 118 System.out.println("SEQUENCE_NUM:"+this.seqNumber); 119 if (this.seqNumber == -1) 120 this.seqNumber = bufferSize; 121 System.out.println("parameter SequenceNumberPerExecution in FASTAFileReader: "+ this.seqNumber); 122 } 123 124 /** 125 * Close the underlying stream. 126 * 127 * @throws IOException 128 */ 129 public void close() throws IOException { 130 in.close(); 131 } 132 133 /** 134 * Read one line from the InputStream into the given Text. A line can be 135 * terminated by '>'. EOF also terminates an otherwise unterminated line. 136 * 137 * @param str 138 * the object to store the given line (without newline) 139 * @param maxBytesToConsume 140 * the maximum number of bytes to consume in this call. This is 141 * only a hint, because if the line cross this threshold, we 142 * allow it to happen. It can overshoot potentially by as much as 143 * one buffer length. 144 * @param seqNumber 145 * sequence number for each key value record. It is read from 146 * configuration file if it is null. 147 * @return the number of bytes read including the (longest) newline found. 148 * 149 * @throws IOException 150 * if the underlying stream throws 151 */ 152 public int readSequence(Text str, int maxBytesToConsume, int seqNumber) 153 throws IOException { 154 155 if (seqNumber == -1) 156 seqNumber = this.seqNumber; 157 str.clear(); 158 int txtLength = 0; // tracks str.getLength(), as an optimization 159 int newlineLength = 0; // length of terminating newline 160 long bytesConsumed = 0; 161 int lineNumberIndex = 0; 162 do { 163 int startPosn = bufferPosn; // starting from where we left off the 164 // last time 165 if (bufferPosn >= bufferLength) { 166 startPosn = bufferPosn = 0; 167 bufferLength = in.read(buffer); 168 if (bufferLength <= 0) 169 break; // EOF 170 } 171 172 for (; bufferPosn < bufferLength; ++bufferPosn) { // search for 173 // new sequence 174 175 if (buffer[bufferPosn] == SEQ_DELIMITER && lineNumberIndex >= seqNumber) { 176 break; 177 } 178 if (buffer[bufferPosn] == SEQ_DELIMITER && bytesConsumed > maxBytesToConsume) { 179 break; 180 } 181 if (buffer[bufferPosn] == SEQ_DELIMITER) { 182 lineNumberIndex++; 183 } 184 bytesConsumed++; 185 } 186 int readLength = bufferPosn - startPosn; 187 if (lineNumberIndex >= seqNumber) 188 --readLength; // move position back for one byte to exclude the delimiter '>' of the next sequence 189 //bytesConsumed += readLength; 190 int appendLength = readLength - newlineLength; 191 if (appendLength > 0) { 192 str.append(buffer, startPosn, appendLength); 193 txtLength += appendLength; 194 } 195 //bufferPosn == bufferLength, it means the current sequence has not been read completely, so continue the loop 196 } while ((lineNumberIndex < seqNumber || bufferPosn == bufferLength) && bytesConsumed <= maxBytesToConsume); 197 if (bytesConsumed > (long) Integer.MAX_VALUE) 198 throw new IOException("Too many bytes before newline: " 199 + bytesConsumed); 200 return (int) bytesConsumed; 201 } 202 203 /** 204 * Read from the InputStream into the given Text. 205 * 206 * @param str 207 * the object to store the given line 208 * @return the number of bytes read including the newline 209 * @throws IOException 210 * if the underlying stream throws 211 */ 212 public int readSequence(Text str) throws IOException { 213 return readSequence(str, Integer.MAX_VALUE, -1); 214 } 215 216}