001/* DuplicatedTextInputFormat
002
003/*
004 * Copyright (c) 2010-2013 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.hadoop.io.input;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.hadoop.fs.BlockLocation;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.io.IntWritable;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapreduce.InputSplit;
040import org.apache.hadoop.mapreduce.JobContext;
041import org.apache.hadoop.mapreduce.RecordReader;
042import org.apache.hadoop.mapreduce.TaskAttemptContext;
043import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
044import org.kepler.ddp.Utilities;
045
046//////////////////////////////////////////////////////////////////////////
047////DuplicatedTextInputFormat
048
049/**
050* This class provides InputFormat to read file(s) repeatedly. 
051* Key is the current index value, value is the whole content of the file.
052* The whole content of the file will be sent to value repeatedly unless key number reaches parallel number. 
053* 
054* @author Jianwu Wang (jianwu@sdsc.edu)
055* @version $Id: DuplicatedTextInputFormat.java 33070 2014-11-12 23:21:09Z crawl $
056*/
057
058public class DuplicatedTextInputFormat extends FileInputFormat<IntWritable, Text> {
059
060        
061        private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL;
062        
063        @Override
064        public List<InputSplit> getSplits(JobContext job) throws IOException {
065
066                // generate splits
067                List<InputSplit> splits = new ArrayList<InputSplit>();
068                List<FileStatus> files = listStatus(job);
069                long length = 0;
070                for (FileStatus file : files) {
071                        Path path = file.getPath();
072                        length = file.getLen();
073                        if (length != 0) {
074                                FileSystem fs = path.getFileSystem(job.getConfiguration());
075                                BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
076                                                0, length);
077                                splits.add(makeSplit(path, 0, length,
078                                                        blkLocations[0].getHosts()));
079                        } else {
080                                // Create empty hosts array for zero length files
081                                splits.add(makeSplit(path, 0, length, new String[0]));
082                        }
083                }
084                // Save the number of input files for metrics/loadgen
085                job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
086                System.out.println("Total # of splits: " + splits.size());
087                return splits;
088        }
089        
090        @Override
091        public RecordReader<IntWritable, Text> createRecordReader(
092                        InputSplit split, TaskAttemptContext context)
093                        throws IOException, InterruptedException {
094                int paraNum = context.getConfiguration().getInt(PARALLEL_NUMBER, 1);
095                DuplicatedTextRecordReader reader = new DuplicatedTextRecordReader();
096                reader.setParaNum(paraNum);
097            return reader;
098        }
099}