001/*
002 * Copyright (c) 2005-2012 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:17:08 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33064 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.bio.stratosphere.input;
030
031import java.io.IOException;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.kepler.ddp.Utilities;
036
037import eu.stratosphere.api.common.io.statistics.BaseStatistics;
038import eu.stratosphere.api.java.record.io.FileInputFormat;
039import eu.stratosphere.configuration.Configuration;
040import eu.stratosphere.core.fs.FileInputSplit;
041import eu.stratosphere.core.fs.Path;
042import eu.stratosphere.types.Record;
043import eu.stratosphere.types.StringValue;
044
045
046
047/**
048* This Input Format is to read fasta sequence file and generate key value pair records.
049* The format of each key is : partitionNumber_(readPosIndex+start) and each value is a list of sequences. 
050* Parameter 'KEPLER::parameter::SequenceNumberPerExecution' specify at most how many sequences will be put into each record.
051* This Input Format can guarantee each sequence is read in a whole and each record won't have truncated sequences.  
052* It is built based on eu.stratosphere.pact.common.io.TextInputFormat class.
053* 
054* @author Jianwu Wang
055* @version $Id: FASTAInputFormat.java 33064 2014-11-12 23:17:08Z crawl $
056*/
057public class FASTAInputFormat extends FileInputFormat
058{
059        /**
060         * The configuration key to set the record delimiter.
061         */
062        public static final String RECORD_DELIMITER = "textformat.delimiter";
063        
064        /**
065         * The configuration key to set the number of samples to take for the statistics.
066         */
067        public static final String NUM_STATISTICS_SAMPLES = "textformat.numSamples";
068
069        /**
070         * The configuration key to set the sequence number. It specifies the sequence number to be processed for each sub-workflow execution.
071         */
072        public static final String SEQUENCE_NUM = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::SequenceNumberPerExecution";
073        
074        /**
075         * The configuration key to set position of the record delimiter. its value is null, prefix or suffix.
076         */
077        public static final String DELIMITER_POSITION = "textformat.delimiter.position";
078        
079        /**
080         * The log.
081         */
082        private static final Log LOG = LogFactory.getLog(FASTAInputFormat.class);
083        
084        /**
085         * The default read buffer size = 1MB.
086         */
087        private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
088        
089        /**
090         * The default number of sample lines to consider when calculating the line width.
091         */
092        private static final int DEFAULT_NUM_SAMPLES = 10;
093        
094        // --------------------------------------------------------------------------------------------
095        
096        private byte[] readBuffer;
097
098        private byte[] wrapBuffer;
099
100        private int readPos;
101        
102        //the index of the current read position, it is used to be the key of record.
103        private long readPosIndex;
104        
105        private int seqIndex;
106        
107        private long localSplitLength;
108        
109        private int partitionNumber;
110
111        private int limit;
112
113        //set delimiter to be '>' for fasta file
114        private byte[] delimiter = new byte[] { '>' };
115
116        private boolean overLimit;
117
118        private boolean end;
119        
120        private int bufferSize = -1;
121        
122        private int numLineSamples;                                                                             // the number of lines to sample for statistics
123        
124        Integer seq_number = null;
125        
126        private long readPosOffset;
127        
128        String delimiter_position = null;
129        
130        private Path splitPath;
131
132
133        // --------------------------------------------------------------------------------------------
134
135        /**
136         * Gets the delimiter that defines the record boundaries.
137         * 
138         * @return The delimiter, as bytes.
139         */
140        public byte[] getDelimiter()
141        {
142                return delimiter;
143        }
144        
145        /**
146         * Sets the size of the buffer to be used to find record boundaries. This method has only an effect, if it is called
147         * before the input format is opened.
148         * 
149         * @param bufferSize The buffer size to use.
150         */
151        public void setBufferSize(int bufferSize)
152        {
153                this.bufferSize = bufferSize;
154        }
155        
156        /**
157         * Gets the size of the buffer internally used to parse record boundaries.
158         * 
159         * @return The size of the parsing buffer.
160         */
161        public int getBufferSize()
162        {
163                return this.readBuffer == null ? 0: this.readBuffer.length;
164        }
165        
166        // --------------------------------------------------------------------------------------------
167        
168        /**
169         * Configures this input format by reading the path to the file from the configuration and the string that
170         * defines the record delimiter.
171         * 
172         * @param parameters The configuration object to read the parameters from.
173         */
174        @Override
175        public void configure(Configuration parameters)
176        {
177                super.configure(parameters);
178                //set delimiter to be '>' for fasta file
179                String delimString = parameters.getString(RECORD_DELIMITER, ">");
180                System.out.println("delimString:" + delimString);
181                if (delimString == null) {
182                        throw new IllegalArgumentException("The delimiter not be null.");
183                }
184
185                this.delimiter = delimString.getBytes();
186                
187                // set the number of samples
188                this.numLineSamples = DEFAULT_NUM_SAMPLES;
189                String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null);
190                
191                if (samplesString != null) {
192                        try {
193                                this.numLineSamples = Integer.parseInt(samplesString);
194                        }
195                        catch (NumberFormatException nfex) {
196                                if (LOG.isWarnEnabled())
197                                        LOG.warn("Invalid value for number of samples to take: " + samplesString +
198                                                        ". Using default value of " + DEFAULT_NUM_SAMPLES);
199                        }
200                }
201                
202                this.seq_number = new Integer(parameters.getString(SEQUENCE_NUM, "5000"));
203                
204                System.out.println("SEQUENCE_NUM:"+this.seq_number);
205                
206                delimiter_position = parameters.getString(DELIMITER_POSITION, "prefix");
207                
208        }
209        
210        
211        
212        // --------------------------------------------------------------------------------------------
213
214        /* (non-Javadoc)
215         * @see eu.stratosphere.pact.common.io.InputFormat#getStatistics()
216         */
217        @Override
218        public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics)
219        {
220                return null;
221        }
222
223        /**
224         * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers
225         * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped.
226         * 
227         * @param split The input split to open.
228         * 
229         * @see eu.stratosphere.pact.common.io.FileInputFormat#open(eu.stratosphere.nephele.fs.FileInputSplit)
230         */
231        @Override
232        public void open(FileInputSplit split) throws IOException
233        {
234                super.open(split);
235                this.splitPath = split.getPath();
236                partitionNumber = split.getSplitNumber();
237                localSplitLength = split.getLength();
238                readPosOffset = 0;
239                readPosIndex = 0;
240                
241                //System.out.println("split in one open():" + split);
242                
243                this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
244                this.readBuffer = new byte[this.bufferSize];
245                this.wrapBuffer = new byte[256];
246
247                this.limit = 0;
248                this.readPos = 0;
249                this.overLimit = false;
250                this.end = false;
251
252                if (this.splitStart != 0) {
253                //the split is not the first split, so goes back to the last split and gets its last record.
254                        this.stream.seek(this.splitStart);                      
255                        readPosIndex = readSequence(1).length;
256                        
257                        // if the first partial record already pushes the stream over the limit of our split, then no
258                        // record starts within this split 
259                        if (this.overLimit) {
260                                this.end = true;
261                        }
262                }
263                else {
264                        fillBuffer();
265                }
266        }
267
268        /**
269         * Checks whether the current split is at its end.
270         * 
271         * @return True, if the split is at its end, false otherwise.
272         */
273        @Override
274        public boolean reachedEnd()
275        {
276                return this.end;
277        }
278        
279//      @Override
280//      public boolean nextRecord(KeyValuePair<PactString, PactString> pair) throws IOException
281//      {
282//              byte[] seqBytes = readSequence(seq_number.intValue());
283//              if (seqBytes == null) {
284//                      this.end = true;
285//              }
286//              if (end)
287//                      return false;
288//              pair.setKey(new PactString (new Integer(this.partitionNumber).toString() + "_" + new Long(this.readPosIndex + this.start).toString()));
289//              pair.setValue(new PactString(new String(seqBytes)));
290//              System.out.println("value size for key: " + pair.getKey() + " is " + seqBytes.length);
291//              //update readPosIndex, which will be the key of the next record.
292//              readPosIndex = readPosIndex + seqBytes.length;
293//              return true;
294//      }
295        
296        @Override
297        public boolean nextRecord(Record record) throws IOException {
298                final StringValue string = new StringValue();
299                byte[] seqBytes = readSequence(seq_number.intValue());
300                if (seqBytes == null) {
301                        this.end = true;
302                }
303                if (end)
304                        return false;
305                record.setField(0, new StringValue (splitPath.getName() + "_" + new Integer(this.partitionNumber).toString() + "_" + new Long(this.readPosIndex + this.splitStart).toString()));
306                string.setValueAscii(seqBytes, 0, seqBytes.length);
307                record.setField(1, string);
308                //System.out.println("value size for key: " + record.getField(0, PactString.class) + " is " + record.getField(1, PactString.class));
309                //update readPosIndex, which will be the key of the next record.
310                readPosIndex = readPosIndex + seqBytes.length;
311                return true;
312        }
313
314        /**
315         * Closes the input by releasing all buffers and closing the file input stream.
316         * 
317         * @throws IOException Thrown, if the closing of the file stream causes an I/O error.
318         */
319        @Override
320        public void close() throws IOException
321        {
322                this.wrapBuffer = null;
323                this.readBuffer = null;
324                this.readPosIndex = 0;
325                
326                super.close();
327        }
328
329        // --------------------------------------------------------------------------------------------
330
331        private byte[] readSequence(int seqNumber) throws IOException {
332                if (this.stream == null || this.overLimit) {
333                        return null;
334                }
335                //System.out.println("just enter into out readSequence. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", localSplitLength:" + localSplitLength);
336
337                
338                int countInWrapBuffer = 0;
339
340                /* position of matching positions in the delimiter byte array */
341                int i = 0;
342                
343                //it start with 1 because that the first delimiter is be counted, since the '>' is always at the beginning of data.
344                seqIndex = 1;
345
346                while (true) {
347                        if (this.readPos >= this.limit) {
348                                if (!fillBuffer()) {
349                                        if (countInWrapBuffer > 0) {
350                                                byte[] tmp = new byte[countInWrapBuffer];
351                                                System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
352                                                return tmp;
353                                        } else {
354                                                return null;
355                                        }
356                                }
357                        }
358
359                        int startPos = this.readPos;
360                        int count = 0;
361
362                        //System.out.println("before while. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", localSplitLength:" + localSplitLength);
363
364                        while (this.readPos < this.limit && seqIndex <= seqNumber && readPosOffset < localSplitLength) {
365                                
366                                if ((this.readBuffer[this.readPos]) == this.delimiter[i] && readPos - startPos > this.delimiter.length) {
367                                //the second condition is to make sure the first delimiter will not be counted, since the '>' is always at the beginning of data.
368                                        i++;
369                                        if (i == this.delimiter.length) {
370                                                seqIndex++;
371                                                i = 0;
372                                        }
373                                } else {
374                                        i = 0;
375                                }
376                                //readPosOffset add one because readPos add one.
377                                readPosOffset++;
378                                readPos++;
379                        }
380
381                        //System.out.println("drop out reason. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", localSplitLength:" + localSplitLength);
382                        // check why we dropped out
383                        if (this.readPos == this.limit) {
384                                count = this.limit - startPos;
385                                // buffer exhausted
386                                while (this.wrapBuffer.length - countInWrapBuffer < count) {
387                                        // reallocate
388                                        byte[] tmp = new byte[this.wrapBuffer.length * 2];
389                                        System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
390                                        this.wrapBuffer = tmp;
391                                }
392
393                                System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count);
394                                countInWrapBuffer += count;
395                        }
396                        //if (seqIndex == seqNumber || readPosOffset > splitLength) {
397                        else {
398                                if (readPosOffset == localSplitLength) {
399                                //reach the end of the split. Now read the across the boundary until the first delimiter of the next split.
400                                        //System.out.println("reach end of the split");
401                                        i = 0;
402                                        while (this.readPos < this.limit && i < this.delimiter.length) {
403                                                if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) {
404                                                        i++;
405                                                } else {
406                                                        i = 0;
407                                                }
408
409                                        }
410                                        //set overLimit to be true since the end of this split has been reached.
411                                        this.overLimit = true;
412                                } else {
413                                        //reach the next sequence. the index will be subtracted by 1 since its initial value is 1.
414                                        seqIndex--;
415                                }
416                                //this.delimiter.length need to be subtracted from readPos because we do not want to count the beginning '>' of the next sequence.
417                                count = this.readPos - startPos - this.delimiter.length;
418                                //move read position back before the delimiter.
419                                this.readPos = this.readPos - this.delimiter.length;
420                                this.readPosOffset = this.readPosOffset - this.delimiter.length;
421
422                                // copy to byte array
423                                if (countInWrapBuffer > 0) {
424                                        byte[] end = new byte[countInWrapBuffer + count];
425                                        if (count >= 0) {
426                                                System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer);
427                                                System.arraycopy(this.readBuffer, 0, end, countInWrapBuffer, count);
428                                                return end;
429                                        } else {
430                                                // count < 0
431                                                System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer + count);
432                                                return end;
433                                        }
434                                } else {
435                                        byte[] end = new byte[count];
436                                        System.arraycopy(this.readBuffer, startPos, end, 0, count);
437                                        return end;
438                                }
439                        } 
440                }
441        }
442        
443        private final boolean fillBuffer() throws IOException {
444                int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
445                if (this.splitLength <= 0) {
446                        toRead = this.readBuffer.length;
447                        this.overLimit = true;
448                }
449
450                int read = this.stream.read(this.readBuffer, 0, toRead);
451
452                if (read == -1) {
453                        this.stream.close();
454                        this.stream = null;
455                        return false;
456                } else {
457                        this.splitLength -= read;
458                        this.readPos = 0;
459                        this.limit = read;
460                        return true;
461                }
462        }
463        
464}