001/* 002 * Copyright (c) 2005-2012 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:17:08 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33064 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.bio.stratosphere.input; 030 031import java.io.IOException; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.kepler.ddp.Utilities; 036 037import eu.stratosphere.api.common.io.statistics.BaseStatistics; 038import eu.stratosphere.api.java.record.io.FileInputFormat; 039import eu.stratosphere.configuration.Configuration; 040import eu.stratosphere.core.fs.FileInputSplit; 041import eu.stratosphere.core.fs.Path; 042import eu.stratosphere.types.Record; 043import eu.stratosphere.types.StringValue; 044 045 046 047 048/** 049* This Input Format is to read fasta sequence file and generate key value pair records. 050* The format of each key is : partitionNumber_(readPosIndex+start) and each value is a list of sequences. 051* Parameter 'KEPLER::parameter::SequenceSizePerExecution' specify the maximum size of sequence data that will be put into each record. 052* This Input Format can guarantee each sequence is read in a whole and each record won't have truncated sequences. 053* It is built based on eu.stratosphere.pact.common.io.TextInputFormat class. 054* 055* @author Jianwu Wang 056* @version $Id: FASTAInputFormatBySize.java 33064 2014-11-12 23:17:08Z crawl $ 057*/ 058public class FASTAInputFormatBySize extends FileInputFormat 059{ 060 /** 061 * The configuration key to set the record delimiter. 062 */ 063 public static final String RECORD_DELIMITER = "textformat.delimiter"; 064 065 /** 066 * The configuration key to set the number of samples to take for the statistics. 067 */ 068 public static final String NUM_STATISTICS_SAMPLES = "textformat.numSamples"; 069 070 /** 071 * The configuration key to set the sequence data size (unite: MB). It specifies the data size to be processed for each sub-workflow execution. Its default value is 1. 072 */ 073 public static final String SEQ_SIZE_PER_EXE = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::SequenceSizePerExecution"; 074 075 /** 076 * The configuration key to set position of the record delimiter. its value is null, prefix or suffix. 077 */ 078 public static final String DELIMITER_POSITION = "textformat.delimiter.position"; 079 080 /** 081 * The log. 082 */ 083 private static final Log LOG = LogFactory.getLog(FASTAInputFormatBySize.class); 084 085 /** 086 * The default read buffer size = 1MB. 087 */ 088 private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024; 089 090 /** 091 * The default number of sample lines to consider when calculating the line width. 092 */ 093 private static final int DEFAULT_NUM_SAMPLES = 10; 094 095 // -------------------------------------------------------------------------------------------- 096 097 private Class<StringValue> keyClass; 098 099 private Class<StringValue> valueClass; 100 101 private byte[] readBuffer; 102 103 //It is used to store the data between the position of the last readSequence() result before reaching limit and the current limit position. 104 //The data needs to be added to the next result of readSequence(). 105 private byte[] wrapBuffer; 106 107 private int readPos; 108 109 //the index of the current read position, it is used to be the key of record. 110 private long readPosIndex; 111 112 private long localSplitLength; 113 114 private int partitionNumber; 115 116 private int limit; 117 118 //set delimiter to be '>' for fasta file 119 private byte[] delimiter = new byte[] { '>' }; 120 121 private boolean overLimit; 122 123 private boolean end; 124 125 private int bufferSize = -1; 126 127 private int numLineSamples; // the number of lines to sample for statistics 128 129 Float dataSize = null; 130 131 private long readPosOffset; 132 133 String delimiter_position = null; 134 135 private Path splitPath; 136 137 // -------------------------------------------------------------------------------------------- 138 139 /** 140 * Gets the delimiter that defines the record boundaries. 141 * 142 * @return The delimiter, as bytes. 143 */ 144 public byte[] getDelimiter() 145 { 146 return delimiter; 147 } 148 149 /** 150 * Sets the size of the buffer to be used to find record boundaries. This method has only an effect, if it is called 151 * before the input format is opened. 152 * 153 * @param bufferSize The buffer size to use. 154 */ 155 public void setBufferSize(int bufferSize) 156 { 157 this.bufferSize = bufferSize; 158 } 159 160 /** 161 * Gets the size of the buffer internally used to parse record boundaries. 162 * 163 * @return The size of the parsing buffer. 164 */ 165 public int getBufferSize() 166 { 167 return this.readBuffer == null ? 0: this.readBuffer.length; 168 } 169 170 // -------------------------------------------------------------------------------------------- 171 172 /** 173 * Configures this input format by reading the path to the file from the configuration and the string that 174 * defines the record delimiter. 175 * 176 * @param parameters The configuration object to read the parameters from. 177 */ 178 @Override 179 public void configure(Configuration parameters) 180 { 181 super.configure(parameters); 182 //set delimiter to be '>' for fasta file 183 String delimString = parameters.getString(RECORD_DELIMITER, ">"); 184 System.out.println("delimString:" + delimString); 185 if (delimString == null) { 186 throw new IllegalArgumentException("The delimiter not be null."); 187 } 188 189 this.delimiter = delimString.getBytes(); 190 191 // set the number of samples 192 this.numLineSamples = DEFAULT_NUM_SAMPLES; 193 String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null); 194 195 if (samplesString != null) { 196 try { 197 this.numLineSamples = Integer.parseInt(samplesString); 198 } 199 catch (NumberFormatException nfex) { 200 if (LOG.isWarnEnabled()) 201 LOG.warn("Invalid value for number of samples to take: " + samplesString + 202 ". Using default value of " + DEFAULT_NUM_SAMPLES); 203 } 204 } 205 206 this.dataSize = new Float(parameters.getString(SEQ_SIZE_PER_EXE, "1")); 207 208 System.out.println("SequenceSizePerExecution parameter value:" + this.dataSize); 209 210 delimiter_position = parameters.getString(DELIMITER_POSITION, "prefix"); 211 212 } 213 214 215 216 // -------------------------------------------------------------------------------------------- 217 218 /* (non-Javadoc) 219 * @see eu.stratosphere.pact.common.io.InputFormat#getStatistics() 220 */ 221 @Override 222 public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) 223 { 224 return null; 225 } 226 227 /** 228 * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers 229 * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped. 230 * 231 * @param split The input split to open. 232 * 233 * @see eu.stratosphere.pact.common.io.FileInputFormat#open(eu.stratosphere.nephele.fs.FileInputSplit) 234 */ 235 @Override 236 public void open(FileInputSplit split) throws IOException 237 { 238 super.open(split); 239 this.splitPath = split.getPath(); 240 partitionNumber = split.getSplitNumber(); 241 localSplitLength = split.getLength(); 242 readPosOffset = 0; 243 readPosIndex = 0; 244 245 System.out.println("split in one open():" + split); 246 247 this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize; 248 this.readBuffer = new byte[this.bufferSize]; 249 this.wrapBuffer = new byte[256]; 250 251 this.limit = 0; 252 this.readPos = 0; 253 this.overLimit = false; 254 this.end = false; 255 256 if (this.splitStart != 0) { 257 //the split is not the first split, so goes back to the last split and gets its last record. 258 this.stream.seek(this.splitStart); 259 readPosIndex = readSequence(-1).length; 260 261 // if the first partial record already pushes the stream over the limit of our split, then no 262 // record starts within this split 263 if (this.overLimit) { 264 this.end = true; 265 } 266 } 267 else { 268 fillBuffer(); 269 } 270 } 271 272 /** 273 * Checks whether the current split is at its end. 274 * 275 * @return True, if the split is at its end, false otherwise. 276 */ 277 @Override 278 public boolean reachedEnd() 279 { 280 return this.end; 281 } 282 283 @Override 284 public boolean nextRecord(Record record) throws IOException 285 { 286 final StringValue string = new StringValue(); 287 byte[] seqBytes = readSequence(dataSize.floatValue()); 288 if (seqBytes == null) { 289 this.end = true; 290 } 291 if (end) 292 return false; 293 294 record.setField(0, new StringValue (splitPath.getName() + "_" + new Integer(this.partitionNumber).toString() + "_" + new Long(this.readPosIndex + this.splitStart).toString())); 295 string.setValueAscii(seqBytes, 0, seqBytes.length); 296 record.setField(1, string); 297 //System.out.println("value size for key: " + record.getField(0, PactString.class) + " is " + record.getField(1, PactString.class)); 298 //update readPosIndex, which will be the key of the next record. 299 readPosIndex = readPosIndex + seqBytes.length; 300 return true; 301 } 302 303 /** 304 * Closes the input by releasing all buffers and closing the file input stream. 305 * 306 * @throws IOException Thrown, if the closing of the file stream causes an I/O error. 307 */ 308 @Override 309 public void close() throws IOException 310 { 311 this.wrapBuffer = null; 312 this.readBuffer = null; 313 this.readPosIndex = 0; 314 315 super.close(); 316 } 317 318 // -------------------------------------------------------------------------------------------- 319 320 private byte[] readSequence(float dataSizeToRead) throws IOException { 321 if (this.stream == null || this.overLimit) { 322 return null; 323 } 324 //System.out.println("just enter into out readSequence. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", splitLength:" + splitLength); 325 326 float dataByteSize = dataSizeToRead * 1024 * 1024; 327 int countInWrapBuffer = 0; 328 329 /* position of matching positions in the delimiter byte array */ 330 int i = 0; 331 332 //use onlyReadOne flag to check whether it is called by: readPosIndex = readSequence(-1).length; 333 boolean onlyReadOne = false; 334 if (dataByteSize < 0) { 335 onlyReadOne = true; 336 dataByteSize = Integer.MAX_VALUE; 337 } 338 339 while (true) { 340 if (this.readPos >= this.limit) { 341 if (!fillBuffer()) { 342 if (countInWrapBuffer > 0) { 343 byte[] tmp = new byte[countInWrapBuffer]; 344 System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); 345 return tmp; 346 } else { 347 return null; 348 } 349 } 350 } 351 352 int startPos = this.readPos; 353 int count = 0; 354 355 //System.out.println("before while. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", splitLength:" + splitLength); 356 while (this.readPos < this.limit && (this.readPos - startPos + countInWrapBuffer) < dataByteSize && readPosOffset < localSplitLength) { 357 if ((this.readBuffer[this.readPos]) == this.delimiter[i] && (this.readPos - startPos) > this.delimiter.length) { 358 //the second condition is to make sure the first delimiter will not be counted, since the '>' is always at the beginning of data. 359 i++; 360 if (i == this.delimiter.length) { 361 i = 0; 362 if (onlyReadOne) { 363 //it is called by readSequence(-1).length() function, so break after reading the delimiter 364 //readPosOffset add one because readPos add one. 365 readPosOffset++; 366 readPos++; 367 break; 368 } 369 } 370 } else { 371 i = 0; 372 } 373 //readPosOffset add one because readPos add one. 374 readPosOffset++; 375 readPos++; 376 } 377 378 //System.out.println("drop out reason. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", dataByteSize:" + dataByteSize + ", readPosOffset:" + readPosOffset + ", splitLength:" + splitLength); 379 // check why we dropped out 380 if (this.readPos == this.limit) { 381 count = this.limit - startPos; 382 // buffer exhausted 383 while (this.wrapBuffer.length - countInWrapBuffer < count) { 384 // reallocate 385 byte[] tmp = new byte[this.wrapBuffer.length * 2]; 386 System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); 387 this.wrapBuffer = tmp; 388 } 389 390 System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count); 391 countInWrapBuffer += count; 392 } 393 //if (seqIndex == seqNumber || readPosOffset > splitLength) { 394 else { 395 //reach the end of the split or the seq size per execution. Now read the across the boundary until the first delimiter of the next split. 396 //System.out.println("reach end of the split"); 397 i = 0; 398 while (this.readPos < this.limit && i < this.delimiter.length) { 399 if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) { 400 i++; 401 } else { 402 i = 0; 403 } 404 405 } 406 if (readPosOffset == localSplitLength) { 407 //set overLimit to be true since the end of this split has been reached. 408 this.overLimit = true; 409 } 410 //this.delimiter.length need to be subtracted from readPos because we do not want to count the beginning '>' of the next sequence. 411 count = this.readPos - startPos - this.delimiter.length; 412 //move read position back before the delimiter. 413 this.readPos = this.readPos - this.delimiter.length; 414 this.readPosOffset = this.readPosOffset - this.delimiter.length; 415 416 // copy to byte array 417 if (countInWrapBuffer > 0) { 418 byte[] end = new byte[countInWrapBuffer + count]; 419 if (count >= 0) { 420 System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer); 421 System.arraycopy(this.readBuffer, 0, end, countInWrapBuffer, count); 422 return end; 423 } else { 424 // count < 0 425 System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer + count); 426 return end; 427 } 428 } else { 429 byte[] end = new byte[count]; 430 System.arraycopy(this.readBuffer, startPos, end, 0, count); 431 return end; 432 } 433 } 434 } 435 } 436 437 private final boolean fillBuffer() throws IOException { 438 int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; 439 if (this.splitLength <= 0) { 440 toRead = this.readBuffer.length; 441 this.overLimit = true; 442 } 443 444 int read = this.stream.read(this.readBuffer, 0, toRead); 445 446 if (read == -1) { 447 this.stream.close(); 448 this.stream = null; 449 return false; 450 } else { 451 this.splitLength -= read; 452 this.readPos = 0; 453 this.limit = read; 454 return true; 455 } 456 } 457 458}