001/* DuplicatedTextInputFormat 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.hadoop.io.input; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032 033import org.apache.hadoop.fs.BlockLocation; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.io.IntWritable; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.mapreduce.InputSplit; 040import org.apache.hadoop.mapreduce.JobContext; 041import org.apache.hadoop.mapreduce.RecordReader; 042import org.apache.hadoop.mapreduce.TaskAttemptContext; 043import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 044import org.kepler.ddp.Utilities; 045 046////////////////////////////////////////////////////////////////////////// 047////DuplicatedTextInputFormat 048 049/** 050* This class provides InputFormat to read file(s) repeatedly. 051* Key is the current index value, value is the whole content of the file. 052* The whole content of the file will be sent to value repeatedly unless key number reaches parallel number. 053* 054* @author Jianwu Wang (jianwu@sdsc.edu) 055* @version $Id: DuplicatedTextInputFormat.java 33070 2014-11-12 23:21:09Z crawl $ 056*/ 057 058public class DuplicatedTextInputFormat extends FileInputFormat<IntWritable, Text> { 059 060 061 private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL; 062 063 @Override 064 public List<InputSplit> getSplits(JobContext job) throws IOException { 065 066 // generate splits 067 List<InputSplit> splits = new ArrayList<InputSplit>(); 068 List<FileStatus> files = listStatus(job); 069 long length = 0; 070 for (FileStatus file : files) { 071 Path path = file.getPath(); 072 length = file.getLen(); 073 if (length != 0) { 074 FileSystem fs = path.getFileSystem(job.getConfiguration()); 075 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 076 0, length); 077 splits.add(makeSplit(path, 0, length, 078 blkLocations[0].getHosts())); 079 } else { 080 // Create empty hosts array for zero length files 081 splits.add(makeSplit(path, 0, length, new String[0])); 082 } 083 } 084 // Save the number of input files for metrics/loadgen 085 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 086 System.out.println("Total # of splits: " + splits.size()); 087 return splits; 088 } 089 090 @Override 091 public RecordReader<IntWritable, Text> createRecordReader( 092 InputSplit split, TaskAttemptContext context) 093 throws IOException, InterruptedException { 094 int paraNum = context.getConfiguration().getInt(PARALLEL_NUMBER, 1); 095 DuplicatedTextRecordReader reader = new DuplicatedTextRecordReader(); 096 reader.setParaNum(paraNum); 097 return reader; 098 } 099}