001/* FASTAInputFormat is to support multiple sequences reading for mapreduce actor.
002
003/*
004 * Copyright (c) 2010-2013 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.bio.hadoop.input;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.hadoop.fs.BlockLocation;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.mapreduce.InputSplit;
039import org.apache.hadoop.mapreduce.JobContext;
040import org.apache.hadoop.mapreduce.RecordReader;
041import org.apache.hadoop.mapreduce.TaskAttemptContext;
042import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
043import org.kepler.ddp.Utilities;
044
045//////////////////////////////////////////////////////////////////////////
046////FASTAInputFormat
047
048/**
049 * This class extend FileInputFormat class to read multiple sequence in each map
050 * function. The sequence number for each reader is configurable through
051 * DDPSource parameter: SequenceNumberPerExecution.
052 * 
053 * @author Jianwu Wang (jianwu@sdsc.edu)
054 * @version $Id: FASTAInputFormat.java 33064 2014-11-12 23:17:08Z crawl $
055 */
056
057public class FASTAInputFormat extends FileInputFormat<Text, Text> {
058        
059        private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL;
060
061        @Override
062        public RecordReader<Text, Text> createRecordReader(InputSplit split,
063                        TaskAttemptContext context) {
064                return new FASTARecordReader();
065        }
066
067        @Override
068        public List<InputSplit> getSplits(JobContext job) throws IOException {
069                int splitNumber = job.getConfiguration().getInt(
070                                PARALLEL_NUMBER, 1);
071
072                // generate splits
073                List<InputSplit> splits = new ArrayList<InputSplit>();
074                List<FileStatus> files = listStatus(job);
075                long length = 0;
076                for (FileStatus file : files) {
077                        length += file.getLen();
078                }
079                long splitSize = (long)Math.ceil(length*1.0 / splitNumber);
080                System.out.println("splitSize: " + splitSize);
081
082                for (FileStatus file : files) {
083                        Path path = file.getPath();
084                        length = file.getLen();
085                        if (length != 0) {
086                                FileSystem fs = path.getFileSystem(job.getConfiguration());
087                                BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
088                                                0, length);
089                                if (isSplitable(job, path)) {
090                                        long bytesRemaining = length;
091                                        while (((double) bytesRemaining) / splitSize > 1.1) {
092                                                int blkIndex = getBlockIndex(blkLocations, length
093                                                                - bytesRemaining);
094                                                splits.add(makeSplit(path, length - bytesRemaining,
095                                                                splitSize, blkLocations[blkIndex].getHosts()));
096                                                bytesRemaining -= splitSize;
097                                        }
098
099                                        if (bytesRemaining != 0) {
100                                                splits.add(makeSplit(path, length - bytesRemaining,
101                                                                bytesRemaining,
102                                                                blkLocations[blkLocations.length - 1]
103                                                                                .getHosts()));
104                                        }
105                                } else { // not splitable
106                                        splits.add(makeSplit(path, 0, length,
107                                                        blkLocations[0].getHosts()));
108                                }
109                        } else {
110                                // Create empty hosts array for zero length files
111                                splits.add(makeSplit(path, 0, length, new String[0]));
112                        }
113                }
114                // Save the number of input files for metrics/loadgen
115                job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
116                System.out.println("Total # of splits: " + splits.size());
117                return splits;
118        }
119
120}