001/*
002 * Copyright (c) 2005-2012 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:17:08 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33064 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.bio.stratosphere.input;
030
031import java.io.IOException;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.kepler.ddp.Utilities;
036
037import eu.stratosphere.api.common.io.statistics.BaseStatistics;
038import eu.stratosphere.api.java.record.io.FileInputFormat;
039import eu.stratosphere.configuration.Configuration;
040import eu.stratosphere.core.fs.FileInputSplit;
041import eu.stratosphere.core.fs.Path;
042import eu.stratosphere.types.Record;
043import eu.stratosphere.types.StringValue;
044
045
046
047
048/**
049* This Input Format is to read fasta sequence file and generate key value pair records.
050* The format of each key is : partitionNumber_(readPosIndex+start) and each value is a list of sequences. 
051* Parameter 'KEPLER::parameter::SequenceSizePerExecution' specify the maximum size of sequence data that will be put into each record.
052* This Input Format can guarantee each sequence is read in a whole and each record won't have truncated sequences.  
053* It is built based on eu.stratosphere.pact.common.io.TextInputFormat class.
054* 
055* @author Jianwu Wang
056* @version $Id: FASTAInputFormatBySize.java 33064 2014-11-12 23:17:08Z crawl $
057*/
058public class FASTAInputFormatBySize extends FileInputFormat
059{
060        /**
061         * The configuration key to set the record delimiter.
062         */
063        public static final String RECORD_DELIMITER = "textformat.delimiter";
064        
065        /**
066         * The configuration key to set the number of samples to take for the statistics.
067         */
068        public static final String NUM_STATISTICS_SAMPLES = "textformat.numSamples";
069
070        /**
071         * The configuration key to set the sequence data size (unite: MB). It specifies the data size to be processed for each sub-workflow execution. Its default value is 1.
072         */
073        public static final String SEQ_SIZE_PER_EXE = Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::SequenceSizePerExecution";
074        
075        /**
076         * The configuration key to set position of the record delimiter. its value is null, prefix or suffix.
077         */
078        public static final String DELIMITER_POSITION = "textformat.delimiter.position";
079        
080        /**
081         * The log.
082         */
083        private static final Log LOG = LogFactory.getLog(FASTAInputFormatBySize.class);
084        
085        /**
086         * The default read buffer size = 1MB.
087         */
088        private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
089        
090        /**
091         * The default number of sample lines to consider when calculating the line width.
092         */
093        private static final int DEFAULT_NUM_SAMPLES = 10;
094        
095        // --------------------------------------------------------------------------------------------
096
097        private Class<StringValue> keyClass;
098        
099        private Class<StringValue> valueClass;
100        
101        private byte[] readBuffer;
102
103        //It is used to store the data between the position of the last readSequence() result before reaching limit and the current limit position.
104        //The data needs to be added to the next result of readSequence().
105        private byte[] wrapBuffer;
106
107        private int readPos;
108        
109        //the index of the current read position, it is used to be the key of record.
110        private long readPosIndex;
111        
112        private long localSplitLength;
113        
114        private int partitionNumber;
115
116        private int limit;
117
118        //set delimiter to be '>' for fasta file
119        private byte[] delimiter = new byte[] { '>' };
120
121        private boolean overLimit;
122
123        private boolean end;
124        
125        private int bufferSize = -1;
126        
127        private int numLineSamples;                                                                             // the number of lines to sample for statistics
128        
129        Float dataSize = null;
130        
131        private long readPosOffset;
132        
133        String delimiter_position = null;
134        
135        private Path splitPath;
136        
137        // --------------------------------------------------------------------------------------------
138
139        /**
140         * Gets the delimiter that defines the record boundaries.
141         * 
142         * @return The delimiter, as bytes.
143         */
144        public byte[] getDelimiter()
145        {
146                return delimiter;
147        }
148        
149        /**
150         * Sets the size of the buffer to be used to find record boundaries. This method has only an effect, if it is called
151         * before the input format is opened.
152         * 
153         * @param bufferSize The buffer size to use.
154         */
155        public void setBufferSize(int bufferSize)
156        {
157                this.bufferSize = bufferSize;
158        }
159        
160        /**
161         * Gets the size of the buffer internally used to parse record boundaries.
162         * 
163         * @return The size of the parsing buffer.
164         */
165        public int getBufferSize()
166        {
167                return this.readBuffer == null ? 0: this.readBuffer.length;
168        }
169        
170        // --------------------------------------------------------------------------------------------
171        
172        /**
173         * Configures this input format by reading the path to the file from the configuration and the string that
174         * defines the record delimiter.
175         * 
176         * @param parameters The configuration object to read the parameters from.
177         */
178        @Override
179        public void configure(Configuration parameters)
180        {
181                super.configure(parameters);
182                //set delimiter to be '>' for fasta file
183                String delimString = parameters.getString(RECORD_DELIMITER, ">");
184                System.out.println("delimString:" + delimString);
185                if (delimString == null) {
186                        throw new IllegalArgumentException("The delimiter not be null.");
187                }
188
189                this.delimiter = delimString.getBytes();
190                
191                // set the number of samples
192                this.numLineSamples = DEFAULT_NUM_SAMPLES;
193                String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null);
194                
195                if (samplesString != null) {
196                        try {
197                                this.numLineSamples = Integer.parseInt(samplesString);
198                        }
199                        catch (NumberFormatException nfex) {
200                                if (LOG.isWarnEnabled())
201                                        LOG.warn("Invalid value for number of samples to take: " + samplesString +
202                                                        ". Using default value of " + DEFAULT_NUM_SAMPLES);
203                        }
204                }
205                
206                this.dataSize = new Float(parameters.getString(SEQ_SIZE_PER_EXE, "1"));
207                
208                System.out.println("SequenceSizePerExecution parameter value:" + this.dataSize);
209                
210                delimiter_position = parameters.getString(DELIMITER_POSITION, "prefix");
211                
212        }
213        
214        
215        
216        // --------------------------------------------------------------------------------------------
217
218        /* (non-Javadoc)
219         * @see eu.stratosphere.pact.common.io.InputFormat#getStatistics()
220         */
221        @Override
222        public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics)
223        {
224                return null;
225        }
226
227        /**
228         * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers
229         * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped.
230         * 
231         * @param split The input split to open.
232         * 
233         * @see eu.stratosphere.pact.common.io.FileInputFormat#open(eu.stratosphere.nephele.fs.FileInputSplit)
234         */
235        @Override
236        public void open(FileInputSplit split) throws IOException
237        {
238                super.open(split);
239                this.splitPath = split.getPath();
240                partitionNumber = split.getSplitNumber();
241                localSplitLength = split.getLength();
242                readPosOffset = 0;
243                readPosIndex = 0;
244                
245                System.out.println("split in one open():" + split);
246                
247                this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
248                this.readBuffer = new byte[this.bufferSize];
249                this.wrapBuffer = new byte[256];
250
251                this.limit = 0;
252                this.readPos = 0;
253                this.overLimit = false;
254                this.end = false;
255
256                if (this.splitStart != 0) {
257                //the split is not the first split, so goes back to the last split and gets its last record.
258                        this.stream.seek(this.splitStart);
259                        readPosIndex = readSequence(-1).length;
260                        
261                        // if the first partial record already pushes the stream over the limit of our split, then no
262                        // record starts within this split 
263                        if (this.overLimit) {
264                                this.end = true;
265                        }
266                }
267                else {
268                        fillBuffer();
269                }
270        }
271
272        /**
273         * Checks whether the current split is at its end.
274         * 
275         * @return True, if the split is at its end, false otherwise.
276         */
277        @Override
278        public boolean reachedEnd()
279        {
280                return this.end;
281        }
282        
283        @Override
284        public boolean nextRecord(Record record) throws IOException
285        {
286                final StringValue string = new StringValue();
287                byte[] seqBytes = readSequence(dataSize.floatValue());
288                if (seqBytes == null) {
289                        this.end = true;
290                }
291                if (end)
292                        return false;
293                
294                record.setField(0, new StringValue (splitPath.getName() + "_" + new Integer(this.partitionNumber).toString() + "_" + new Long(this.readPosIndex + this.splitStart).toString()));
295                string.setValueAscii(seqBytes, 0, seqBytes.length);
296                record.setField(1, string);
297                //System.out.println("value size for key: " + record.getField(0, PactString.class) + " is " + record.getField(1, PactString.class));
298                //update readPosIndex, which will be the key of the next record.
299                readPosIndex = readPosIndex + seqBytes.length;
300                return true;
301        }
302
303        /**
304         * Closes the input by releasing all buffers and closing the file input stream.
305         * 
306         * @throws IOException Thrown, if the closing of the file stream causes an I/O error.
307         */
308        @Override
309        public void close() throws IOException
310        {
311                this.wrapBuffer = null;
312                this.readBuffer = null;
313                this.readPosIndex = 0;
314                
315                super.close();
316        }
317
318        // --------------------------------------------------------------------------------------------
319
320        private byte[] readSequence(float dataSizeToRead) throws IOException {
321                if (this.stream == null || this.overLimit) {
322                        return null;
323                }
324                //System.out.println("just enter into out readSequence. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", splitLength:" + splitLength);
325
326                float dataByteSize = dataSizeToRead * 1024 * 1024;
327                int countInWrapBuffer = 0;
328
329                /* position of matching positions in the delimiter byte array */
330                int i = 0;
331                
332                //use onlyReadOne flag to check whether it is called by: readPosIndex = readSequence(-1).length;
333                boolean onlyReadOne = false;
334                if (dataByteSize < 0) {
335                        onlyReadOne = true;
336                        dataByteSize = Integer.MAX_VALUE;
337                }
338
339                while (true) {
340                        if (this.readPos >= this.limit) {
341                                if (!fillBuffer()) {
342                                        if (countInWrapBuffer > 0) {
343                                                byte[] tmp = new byte[countInWrapBuffer];
344                                                System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
345                                                return tmp;
346                                        } else {
347                                                return null;
348                                        }
349                                }
350                        }
351
352                        int startPos = this.readPos;
353                        int count = 0;
354
355                        //System.out.println("before while. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", seqIndex:" + seqIndex + ", seqNumber:" + seqNumber + ", readPosOffset:" + readPosOffset + ", splitLength:" + splitLength);
356                        while (this.readPos < this.limit && (this.readPos - startPos + countInWrapBuffer) < dataByteSize && readPosOffset < localSplitLength) {
357                                if ((this.readBuffer[this.readPos]) == this.delimiter[i] && (this.readPos - startPos) > this.delimiter.length) {
358                                //the second condition is to make sure the first delimiter will not be counted, since the '>' is always at the beginning of data.
359                                        i++;
360                                        if (i == this.delimiter.length) {
361                                                i = 0;
362                                                if (onlyReadOne) {
363                                                        //it is called by readSequence(-1).length() function, so break after reading the delimiter
364                                                        //readPosOffset add one because readPos add one.
365                                                        readPosOffset++;
366                                                        readPos++;
367                                                        break;
368                                                }
369                                        }
370                                } else {
371                                        i = 0;
372                                }
373                                //readPosOffset add one because readPos add one.
374                                readPosOffset++;
375                                readPos++;
376                        }
377
378                        //System.out.println("drop out reason. this.readPos: " + this.readPos + ", this.limit:" + this.limit + ", dataByteSize:" + dataByteSize + ", readPosOffset:" + readPosOffset + ", splitLength:" + splitLength);
379                        // check why we dropped out
380                        if (this.readPos == this.limit) {
381                                count = this.limit - startPos;
382                                // buffer exhausted
383                                while (this.wrapBuffer.length - countInWrapBuffer < count) {
384                                        // reallocate
385                                        byte[] tmp = new byte[this.wrapBuffer.length * 2];
386                                        System.arraycopy(this.wrapBuffer, 0, tmp, 0, countInWrapBuffer);
387                                        this.wrapBuffer = tmp;
388                                }
389
390                                System.arraycopy(this.readBuffer, startPos, this.wrapBuffer, countInWrapBuffer, count);
391                                countInWrapBuffer += count;
392                        }
393                        //if (seqIndex == seqNumber || readPosOffset > splitLength) {
394                        else {
395                                //reach the end of the split or the seq size per execution. Now read the across the boundary until the first delimiter of the next split.
396                                //System.out.println("reach end of the split");
397                                i = 0;
398                                while (this.readPos < this.limit && i < this.delimiter.length) {
399                                        if ((this.readBuffer[this.readPos++]) == this.delimiter[i]) {
400                                                i++;
401                                        } else {
402                                                i = 0;
403                                        }
404
405                                }
406                                if (readPosOffset == localSplitLength) {
407                                        //set overLimit to be true since the end of this split has been reached.
408                                        this.overLimit = true;
409                                }
410                                //this.delimiter.length need to be subtracted from readPos because we do not want to count the beginning '>' of the next sequence.
411                                count = this.readPos - startPos - this.delimiter.length;
412                                //move read position back before the delimiter.
413                                this.readPos = this.readPos - this.delimiter.length;
414                                this.readPosOffset = this.readPosOffset - this.delimiter.length;
415
416                                // copy to byte array
417                                if (countInWrapBuffer > 0) {
418                                        byte[] end = new byte[countInWrapBuffer + count];
419                                        if (count >= 0) {
420                                                System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer);
421                                                System.arraycopy(this.readBuffer, 0, end, countInWrapBuffer, count);
422                                                return end;
423                                        } else {
424                                                // count < 0
425                                                System.arraycopy(this.wrapBuffer, 0, end, 0, countInWrapBuffer + count);
426                                                return end;
427                                        }
428                                } else {
429                                        byte[] end = new byte[count];
430                                        System.arraycopy(this.readBuffer, startPos, end, 0, count);
431                                        return end;
432                                }
433                        } 
434                }
435        }
436        
437        private final boolean fillBuffer() throws IOException {
438                int toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength;
439                if (this.splitLength <= 0) {
440                        toRead = this.readBuffer.length;
441                        this.overLimit = true;
442                }
443
444                int read = this.stream.read(this.readBuffer, 0, toRead);
445
446                if (read == -1) {
447                        this.stream.close();
448                        this.stream = null;
449                        return false;
450                } else {
451                        this.splitLength -= read;
452                        this.readPos = 0;
453                        this.limit = read;
454                        return true;
455                }
456        }
457        
458}