001/* 002 * Copyright (c) 2005-2012 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:17:08 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33064 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.bio.stratosphere.input; 030 031import java.io.IOException; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.kepler.ddp.Utilities; 036 037import eu.stratosphere.api.common.io.statistics.BaseStatistics; 038import eu.stratosphere.api.java.record.io.FileInputFormat; 039import eu.stratosphere.configuration.Configuration; 040import eu.stratosphere.core.fs.FileInputSplit; 041import eu.stratosphere.core.fs.Path; 042import eu.stratosphere.types.Record; 043import eu.stratosphere.types.StringValue; 044 045 046 047/** 048* This Input Format is to read fasta sequence file and generate key value pair records. 049* The format of each key is : partitionNumber_(readPosIndex+start) and each value is a list of sequences. 050* Parameter 'KEPLER::parameter::SequenceNumberPerExecution' specify at most how many sequences will be put into each record. 051* This Input Format can guarantee each sequence is read in a whole and each record won't have truncated sequences. 052* It is built based on eu.stratosphere.pact.common.io.TextInputFormat class. 053* 054* @author Jianwu Wang 055* @version $Id: FASTAInputFormat.java 33064 2014-11-12 23:17:08Z crawl $ 056*/ 057public class FASTAInputFormat extends FileInputFormat 058{ 059 /** 060 * The configuration key to set the record delimiter. 061 */ 062 public static final String RECORD_DELIMITER = "textformat.delimiter"; 063 064 /** 065 * The configuration key to set the number of samples to take for the statistics. 066 */ 067 public static final String NUM_STATISTICS_SAMPLES = "textformat.numSamples"; 068 069 /** 070 * The configuration key to set the sequence number. It specifies the sequence number to be processed for each sub-workflow execution. 071 */ 072 public static final String SEQUENCE_NUM = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::SequenceNumberPerExecution"; 073 074 /** 075 * The configuration key to set position of the record delimiter. its value is null, prefix or suffix. 076 */ 077 public static final String DELIMITER_POSITION = "textformat.delimiter.position"; 078 079 /** 080 * The log. 081 */ 082 private static final Log LOG = LogFactory.getLog(FASTAInputFormat.class); 083 084 /** 085 * The default read buffer size = 1MB. 086 */ 087 private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024; 088 089 /** 090 * The default number of sample lines to consider when calculating the line width. 091 */ 092 private static final int DEFAULT_NUM_SAMPLES = 10; 093 094 // -------------------------------------------------------------------------------------------- 095 096 private byte[] readBuffer; 097 098 private byte[] wrapBuffer; 099 100 private int readPos; 101 102 //the index of the current read position, it is used to be the key of record. 103 private long readPosIndex; 104 105 private int seqIndex; 106 107 private long localSplitLength; 108 109 private int partitionNumber; 110 111 private int limit; 112 113 //set delimiter to be '>' for fasta file 114 private byte[] delimiter = new byte[] { '>' }; 115 116 private boolean overLimit; 117 118 private boolean end; 119 120 private int bufferSize = -1; 121 122 private int numLineSamples; // the number of lines to sample for statistics 123 124 Integer seq_number = null; 125 126 private long readPosOffset; 127 128 String delimiter_position = null; 129 130 private Path splitPath; 131 132 133 // -------------------------------------------------------------------------------------------- 134 135 /** 136 * Gets the delimiter that defines the record boundaries. 137 * 138 * @return The delimiter, as bytes. 139 */ 140 public byte[] getDelimiter() 141 { 142 return delimiter; 143 } 144 145 /** 146 * Sets the size of the buffer to be used to find record boundaries. This method has only an effect, if it is called 147 * before the input format is opened. 148 * 149 * @param bufferSize The buffer size to use. 150 */ 151 public void setBufferSize(int bufferSize) 152 { 153 this.bufferSize = bufferSize; 154 } 155 156 /** 157 * Gets the size of the buffer internally used to parse record boundaries. 158 * 159 * @return The size of the parsing buffer. 160 */ 161 public int getBufferSize() 162 { 163 return this.readBuffer == null ? 0: this.readBuffer.length; 164 } 165 166 // -------------------------------------------------------------------------------------------- 167 168 /** 169 * Configures this input format by reading the path to the file from the configuration and the string that 170 * defines the record delimiter. 171 * 172 * @param parameters The configuration object to read the parameters from. 173 */ 174 @Override 175 public void configure(Configuration parameters) 176 { 177 super.configure(parameters); 178 //set delimiter to be '>' for fasta file 179 String delimString = parameters.getString(RECORD_DELIMITER, ">"); 180 System.out.println("delimString:" + delimString); 181 if (delimString == null) { 182 throw new IllegalArgumentException("The delimiter not be null."); 183 } 184 185 this.delimiter = delimString.getBytes(); 186 187 // set the number of samples 188 this.numLineSamples = DEFAULT_NUM_SAMPLES; 189 String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null); 190 191 if (samplesString != null) { 192 try { 193 this.numLineSamples = Integer.parseInt(samplesString); 194 } 195 catch (NumberFormatException nfex) { 196 if (LOG.isWarnEnabled()) 197 LOG.warn("Invalid value for number of samples to take: " + samplesString + 198 ". Using default value of " + DEFAULT_NUM_SAMPLES); 199 } 200 } 201 202 this.seq_number = new Integer(parameters.getString(SEQUENCE_NUM, "5000")); 203 204 System.out.println("SEQUENCE_NUM:"+this.seq_number); 205 206 delimiter_position = parameters.getString(DELIMITER_POSITION, "prefix"); 207 208 } 209 210 211 212 // -------------------------------------------------------------------------------------------- 213 214 /* (non-Javadoc) 215 * @see eu.stratosphere.pact.common.io.InputFormat#getStatistics() 216 */ 217 @Override 218 public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) 219 { 220 return null; 221 } 222 223 /** 224 * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers 225 * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped. 226 * 227 * @param split The input split to open. 228 * 229 * @see eu.stratosphere.pact.common.io.FileInputFormat#open(eu.stratosphere.nephele.fs.FileInputSplit) 230 */ 231 @Override 232 public void open(FileInputSplit split) throws IOException 233 { 234 super.open(split); 235 this.splitPath = split.getPath(); 236 partitionNumber = split.getSplitNumber(); 237 localSplitLength = split.getLength(); 238 readPosOffset = 0; 239 readPosIndex = 0; 240 241 //System.out.println("split in one open():" + split); 242 243 this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize; 244 this.readBuffer = new byte[this.bufferSize]; 245 this.wrapBuffer = new byte[256]; 246 247 this.limit = 0; 248 this.readPos = 0; 249 this.overLimit = false; 250 this.end = false; 251 252 if (this.splitStart != 0) { 253 //the split is not the first split, so goes back to the last split and gets its last record. 254 this.stream.seek(this.splitStart); 255 readPosIndex = readSequence(1).length; 256 257 // if the first partial record already pushes the stream over the limit of our split, then no 258 // record starts within this split 259 if (this.overLimit) { 260 this.end = true; 261 } 262 } 263 else { 264 fillBuffer(); 265 } 266 } 267 268 /** 269 * Checks whether the current split is at its end. 270 * 271 * @return True, if the split is at its end, false otherwise. 272 */ 273 @Override 274 public boolean reachedEnd() 275 { 276 return this.end; 277 } 278 279// @Override 280// public boolean nextRecord(KeyValuePair<PactString, PactString> pair) throws IOException 281// { 282// byte[] seqBytes = readSequence(seq_number.intValue()); 283// if (seqBytes == null) { 284// this.end = true; 285// } 286// if (end) 287// return false; 288// pair.setKey(new PactString (new Integer(this.partitionNumber).toString() + "_" + new Long(this.readPosIndex + this.start).toString())); 289// pair.setValue(new PactString(new String(seqBytes))); 290// System.out.println("value size for key: " + pair.getKey() + " is " + seqBytes.length); 291// //update readPosIndex, which will be the key of the next record. 292// readPosIndex = readPosIndex + seqBytes.length; 293// return true; 294// } 295 296 @Override 297 public boolean nextRecord(Record record) throws IOException { 298 final StringValue string = new StringValue(); 299 byte[] seqBytes = readSequence(seq_number.intValue()); 300 if (seqBytes == null) { 301 this.end = true; 302 } 303 if (end) 304 return false; 305 record.setField(0, new StringValue (splitPath.getName() + "_" + new Integer(this.partitionNumber).toString() + "_" + new Long(this.readPosIndex + this.splitStart).toString())); 306 string.setValueAscii(seqBytes, 0, seqBytes.length); 307 record.setField(1, string); 308 //System.out.println("value size for key: " + record.getField(0, PactString.class) + " is " + record.getField(1, PactString.class)); 309 //update readPosIndex, which will be the key of the next record. 310 readPosIndex = readPosIndex + seqBytes.length; 311 return true; 312 } 313 314 /** 315 * Closes the input by releasing all buffers and closing the file input stream. 316 * 317 * @throws IOException Thrown, if the closing of the file stream causes an I/O error. 318 */ 319 @Override 320 public void close() throws IOException 321 { 322 this.wrapBuffer = null; 323 this.readBuffer = null; 324 this.readPosIndex = 0; 325 326 super.close(); 327 } 328 329 // -------------------------------------------------------------------------------------------- 330 331 private byte[] readSequence(int seqNumber) throws IOException { 332 if (this.stream == null || this.overLimit) { 333 return null; 334 } 335 //System.out.println("just enter into out readSequence. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", localSplitLength:" + localSplitLength); 336 337 338 int countInWrapBuffer = 0; 339 340 /* position of matching positions in the delimiter byte array */ 341 int i = 0; 342 343 //it start with 1 because that the first delimiter is be counted, since the '>' is always at the beginning of data. 344 seqIndex = 1; 345 346 while (true) { 347 if (this.readPos >= this.limit) { 348 if (!fillBuffer()) { 349 if (countInWrapBuffer > 0) { 350 byte[] tmp = new byte[countInWrapBuffer]; 351 System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); 352 return tmp; 353 } else { 354 return null; 355 } 356 } 357 } 358 359 int startPos = this.readPos; 360 int count = 0; 361 362 //System.out.println("before while. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", localSplitLength:" + localSplitLength); 363 364 while (this.readPos < this.limit && seqIndex <= seqNumber && readPosOffset < localSplitLength) { 365 366 if ((this.readBuffer[this.readPos]) == this.delimiter[i] && readPos - startPos > this.delimiter.length) { 367 //the second condition is to make sure the first delimiter will not be counted, since the '>' is always at the beginning of data. 368 i++; 369 if (i == this.delimiter.length) { 370 seqIndex++; 371 i = 0; 372 } 373 } else { 374 i = 0; 375 } 376 //readPosOffset add one because readPos add one. 377 readPosOffset++; 378 readPos++; 379 } 380 381 //System.out.println("drop out reason. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", localSplitLength:" + localSplitLength); 382 // check why we dropped out 383 if (this.readPos == this.limit) { 384 count = this.limit - startPos; 385 // buffer exhausted 386 while (this.wrapBuffer.length - countInWrapBuffer < count) { 387 // reallocate 388 byte[] tmp = new byte[this.wrapBuffer.length * 2]; 389 System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer); 390 this.wrapBuffer = tmp; 391 } 392 393 System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count); 394 countInWrapBuffer += count; 395 } 396 //if (seqIndex == seqNumber || readPosOffset > splitLength) { 397 else { 398 if (readPosOffset == localSplitLength) { 399 //reach the end of the split. Now read the across the boundary until the first delimiter of the next split. 400 //System.out.println("reach end of the split"); 401 i = 0; 402 while (this.readPos < this.limit && i < this.delimiter.length) { 403 if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) { 404 i++; 405 } else { 406 i = 0; 407 } 408 409 } 410 //set overLimit to be true since the end of this split has been reached. 411 this.overLimit = true; 412 } else { 413 //reach the next sequence. the index will be subtracted by 1 since its initial value is 1. 414 seqIndex--; 415 } 416 //this.delimiter.length need to be subtracted from readPos because we do not want to count the beginning '>' of the next sequence. 417 count = this.readPos - startPos - this.delimiter.length; 418 //move read position back before the delimiter. 419 this.readPos = this.readPos - this.delimiter.length; 420 this.readPosOffset = this.readPosOffset - this.delimiter.length; 421 422 // copy to byte array 423 if (countInWrapBuffer > 0) { 424 byte[] end = new byte[countInWrapBuffer + count]; 425 if (count >= 0) { 426 System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer); 427 System.arraycopy(this.readBuffer, 0, end, countInWrapBuffer, count); 428 return end; 429 } else { 430 // count < 0 431 System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer + count); 432 return end; 433 } 434 } else { 435 byte[] end = new byte[count]; 436 System.arraycopy(this.readBuffer, startPos, end, 0, count); 437 return end; 438 } 439 } 440 } 441 } 442 443 private final boolean fillBuffer() throws IOException { 444 int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; 445 if (this.splitLength <= 0) { 446 toRead = this.readBuffer.length; 447 this.overLimit = true; 448 } 449 450 int read = this.stream.read(this.readBuffer, 0, toRead); 451 452 if (read == -1) { 453 this.stream.close(); 454 this.stream = null; 455 return false; 456 } else { 457 this.splitLength -= read; 458 this.readPos = 0; 459 this.limit = read; 460 return true; 461 } 462 } 463 464}