001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025package org.kepler.hadoop.io.input;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030
031import org.apache.hadoop.fs.BlockLocation;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.mapreduce.InputSplit;
036import org.apache.hadoop.mapreduce.JobContext;
037import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
038import org.kepler.ddp.Utilities;
039
040//////////////////////////////////////////////////////////////////////////
041////LineInputFormat
042
043/**
044* This class extend TextInputFormat class to read lines in each map function. 
045* InputSplit list number is the same with PARALLEL_NUMBER, which will split data even it is only one small file.
046* 
047* @author Jianwu Wang (jianwu@sdsc.edu)
048* @version $Id: LineInputFormat.java 33070 2014-11-12 23:21:09Z crawl $
049*/
050public class LineInputFormat extends TextInputFormat {
051
052        private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL;
053        
054        @Override
055        public List<InputSplit> getSplits(JobContext job) throws IOException {
056                int splitNumber = job.getConfiguration().getInt(
057                                PARALLEL_NUMBER, 1);
058
059                // generate splits
060                List<InputSplit> splits = new ArrayList<InputSplit>();
061                List<FileStatus> files = listStatus(job);
062                long length = 0;
063                for (FileStatus file : files) {
064                        length += file.getLen();
065                }
066                long splitSize = (long)Math.ceil(length*1.0 / splitNumber);
067                System.out.println("splitSize: " + splitSize);
068
069                for (FileStatus file : files) {
070                        Path path = file.getPath();
071                        length = file.getLen();
072                        if (length != 0) {
073                                FileSystem fs = path.getFileSystem(job.getConfiguration());
074                                BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
075                                                0, length);
076                                if (isSplitable(job, path)) {
077                                        long bytesRemaining = length;
078                                        while (((double) bytesRemaining) / splitSize > 1.1) {
079                                                int blkIndex = getBlockIndex(blkLocations, length
080                                                                - bytesRemaining);
081                                                splits.add(makeSplit(path, length - bytesRemaining,
082                                                                splitSize, blkLocations[blkIndex].getHosts()));
083                                                bytesRemaining -= splitSize;
084                                        }
085
086                                        if (bytesRemaining != 0) {
087                                                splits.add(makeSplit(path, length - bytesRemaining,
088                                                                bytesRemaining,
089                                                                blkLocations[blkLocations.length - 1]
090                                                                                .getHosts()));
091                                        }
092                                } else { // not splitable
093                                        splits.add(makeSplit(path, 0, length,
094                                                        blkLocations[0].getHosts()));
095                                }
096                        } else {
097                                // Create empty hosts array for zero length files
098                                splits.add(makeSplit(path, 0, length, new String[0]));
099                        }
100                }
101                // Save the number of input files for metrics/loadgen
102                job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
103                System.out.println("Total # of splits: " + splits.size());
104                return splits;
105        }
106}