001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025package org.kepler.hadoop.io.input;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030
031import org.apache.hadoop.fs.BlockLocation;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.io.LongWritable;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapreduce.InputSplit;
038import org.apache.hadoop.mapreduce.JobContext;
039import org.apache.hadoop.mapreduce.RecordReader;
040import org.apache.hadoop.mapreduce.TaskAttemptContext;
041import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
042import org.kepler.ddp.Utilities;
043
044/**
045 * This class extend DelimitedTextInputFormat class to read text in each map
046 * function. The delimiter for data record splitting is configurable through
047 * DDPSource parameter: "KEPLER::parameter::Delimiter".
048 * 
049 * @author Jianwu Wang (jianwu@sdsc.edu)
050 * @version $Id: DelimitedTextInputFormat.java 33070 2014-11-12 23:21:09Z crawl $
051 */
052
053public class DelimitedTextInputFormat extends FileInputFormat<LongWritable, Text> {
054        
055        private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL;
056
057        @Override
058        public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
059                        TaskAttemptContext context) {
060                return new DelimitedTextRecordReader();
061        }
062
063        @Override
064        public List<InputSplit> getSplits(JobContext job) throws IOException {
065                int splitNumber = job.getConfiguration().getInt(
066                                PARALLEL_NUMBER, 1);
067
068                // generate splits
069                List<InputSplit> splits = new ArrayList<InputSplit>();
070                List<FileStatus> files = listStatus(job);
071                long length = 0;
072                for (FileStatus file : files) {
073                        length += file.getLen();
074                }
075                long splitSize = (long)Math.ceil(length*1.0 / splitNumber);
076                System.out.println("splitSize: " + splitSize);
077
078                for (FileStatus file : files) {
079                        Path path = file.getPath();
080                        length = file.getLen();
081                        if (length != 0) {
082                                FileSystem fs = path.getFileSystem(job.getConfiguration());
083                                BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
084                                                0, length);
085                                if (isSplitable(job, path)) {
086                                        long bytesRemaining = length;
087                                        while (((double) bytesRemaining) / splitSize > 1.1) {
088                                                int blkIndex = getBlockIndex(blkLocations, length
089                                                                - bytesRemaining);
090                                                splits.add(makeSplit(path, length - bytesRemaining,
091                                                                splitSize, blkLocations[blkIndex].getHosts()));
092                                                bytesRemaining -= splitSize;
093                                        }
094
095                                        if (bytesRemaining != 0) {
096                                                splits.add(makeSplit(path, length - bytesRemaining,
097                                                                bytesRemaining,
098                                                                blkLocations[blkLocations.length - 1]
099                                                                                .getHosts()));
100                                        }
101                                } else { // not splitable
102                                        splits.add(makeSplit(path, 0, length,
103                                                        blkLocations[0].getHosts()));
104                                }
105                        } else {
106                                // Create empty hosts array for zero length files
107                                splits.add(makeSplit(path, 0, length, new String[0]));
108                        }
109                }
110                // Save the number of input files for metrics/loadgen
111                job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
112                System.out.println("Total # of splits: " + splits.size());
113                return splits;
114        }
115
116}