001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025package org.kepler.hadoop.io.input; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030 031import org.apache.hadoop.fs.BlockLocation; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.mapreduce.JobContext; 037import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 038import org.kepler.ddp.Utilities; 039 040////////////////////////////////////////////////////////////////////////// 041////LineInputFormat 042 043/** 044* This class extend TextInputFormat class to read lines in each map function. 045* InputSplit list number is the same with PARALLEL_NUMBER, which will split data even it is only one small file. 046* 047* @author Jianwu Wang (jianwu@sdsc.edu) 048* @version $Id: LineInputFormat.java 33070 2014-11-12 23:21:09Z crawl $ 049*/ 050public class LineInputFormat extends TextInputFormat { 051 052 private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL; 053 054 @Override 055 public List<InputSplit> getSplits(JobContext job) throws IOException { 056 int splitNumber = job.getConfiguration().getInt( 057 PARALLEL_NUMBER, 1); 058 059 // generate splits 060 List<InputSplit> splits = new ArrayList<InputSplit>(); 061 List<FileStatus> files = listStatus(job); 062 long length = 0; 063 for (FileStatus file : files) { 064 length += file.getLen(); 065 } 066 long splitSize = (long)Math.ceil(length*1.0 / splitNumber); 067 System.out.println("splitSize: " + splitSize); 068 069 for (FileStatus file : files) { 070 Path path = file.getPath(); 071 length = file.getLen(); 072 if (length != 0) { 073 FileSystem fs = path.getFileSystem(job.getConfiguration()); 074 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 075 0, length); 076 if (isSplitable(job, path)) { 077 long bytesRemaining = length; 078 while (((double) bytesRemaining) / splitSize > 1.1) { 079 int blkIndex = getBlockIndex(blkLocations, length 080 - bytesRemaining); 081 splits.add(makeSplit(path, length - bytesRemaining, 082 splitSize, blkLocations[blkIndex].getHosts())); 083 bytesRemaining -= splitSize; 084 } 085 086 if (bytesRemaining != 0) { 087 splits.add(makeSplit(path, length - bytesRemaining, 088 bytesRemaining, 089 blkLocations[blkLocations.length - 1] 090 .getHosts())); 091 } 092 } else { // not splitable 093 splits.add(makeSplit(path, 0, length, 094 blkLocations[0].getHosts())); 095 } 096 } else { 097 // Create empty hosts array for zero length files 098 splits.add(makeSplit(path, 0, length, new String[0])); 099 } 100 } 101 // Save the number of input files for metrics/loadgen 102 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 103 System.out.println("Total # of splits: " + splits.size()); 104 return splits; 105 } 106}