001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025package org.kepler.hadoop.io.input; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030 031import org.apache.hadoop.fs.BlockLocation; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.LongWritable; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapreduce.InputSplit; 038import org.apache.hadoop.mapreduce.JobContext; 039import org.apache.hadoop.mapreduce.RecordReader; 040import org.apache.hadoop.mapreduce.TaskAttemptContext; 041import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 042import org.kepler.ddp.Utilities; 043 044/** 045 * This class extend DelimitedTextInputFormat class to read text in each map 046 * function. The delimiter for data record splitting is configurable through 047 * DDPSource parameter: "KEPLER::parameter::Delimiter". 048 * 049 * @author Jianwu Wang (jianwu@sdsc.edu) 050 * @version $Id: DelimitedTextInputFormat.java 33070 2014-11-12 23:21:09Z crawl $ 051 */ 052 053public class DelimitedTextInputFormat extends FileInputFormat<LongWritable, Text> { 054 055 private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL; 056 057 @Override 058 public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, 059 TaskAttemptContext context) { 060 return new DelimitedTextRecordReader(); 061 } 062 063 @Override 064 public List<InputSplit> getSplits(JobContext job) throws IOException { 065 int splitNumber = job.getConfiguration().getInt( 066 PARALLEL_NUMBER, 1); 067 068 // generate splits 069 List<InputSplit> splits = new ArrayList<InputSplit>(); 070 List<FileStatus> files = listStatus(job); 071 long length = 0; 072 for (FileStatus file : files) { 073 length += file.getLen(); 074 } 075 long splitSize = (long)Math.ceil(length*1.0 / splitNumber); 076 System.out.println("splitSize: " + splitSize); 077 078 for (FileStatus file : files) { 079 Path path = file.getPath(); 080 length = file.getLen(); 081 if (length != 0) { 082 FileSystem fs = path.getFileSystem(job.getConfiguration()); 083 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 084 0, length); 085 if (isSplitable(job, path)) { 086 long bytesRemaining = length; 087 while (((double) bytesRemaining) / splitSize > 1.1) { 088 int blkIndex = getBlockIndex(blkLocations, length 089 - bytesRemaining); 090 splits.add(makeSplit(path, length - bytesRemaining, 091 splitSize, blkLocations[blkIndex].getHosts())); 092 bytesRemaining -= splitSize; 093 } 094 095 if (bytesRemaining != 0) { 096 splits.add(makeSplit(path, length - bytesRemaining, 097 bytesRemaining, 098 blkLocations[blkLocations.length - 1] 099 .getHosts())); 100 } 101 } else { // not splitable 102 splits.add(makeSplit(path, 0, length, 103 blkLocations[0].getHosts())); 104 } 105 } else { 106 // Create empty hosts array for zero length files 107 splits.add(makeSplit(path, 0, length, new String[0])); 108 } 109 } 110 // Save the number of input files for metrics/loadgen 111 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 112 System.out.println("Total # of splits: " + splits.size()); 113 return splits; 114 } 115 116}