001/* FASTAInputFormat is to support multiple sequences reading for mapreduce actor. 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.bio.hadoop.input; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.List; 032 033import org.apache.hadoop.fs.BlockLocation; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.mapreduce.InputSplit; 039import org.apache.hadoop.mapreduce.JobContext; 040import org.apache.hadoop.mapreduce.RecordReader; 041import org.apache.hadoop.mapreduce.TaskAttemptContext; 042import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 043import org.kepler.ddp.Utilities; 044 045////////////////////////////////////////////////////////////////////////// 046////FASTAInputFormat 047 048/** 049 * This class extend FileInputFormat class to read multiple sequence in each map 050 * function. The sequence number for each reader is configurable through 051 * DDPSource parameter: SequenceNumberPerExecution. 052 * 053 * @author Jianwu Wang (jianwu@sdsc.edu) 054 * @version $Id: FASTAInputFormat.java 33064 2014-11-12 23:17:08Z crawl $ 055 */ 056 057public class FASTAInputFormat extends FileInputFormat<Text, Text> { 058 059 private static final String PARALLEL_NUMBER = Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL; 060 061 @Override 062 public RecordReader<Text, Text> createRecordReader(InputSplit split, 063 TaskAttemptContext context) { 064 return new FASTARecordReader(); 065 } 066 067 @Override 068 public List<InputSplit> getSplits(JobContext job) throws IOException { 069 int splitNumber = job.getConfiguration().getInt( 070 PARALLEL_NUMBER, 1); 071 072 // generate splits 073 List<InputSplit> splits = new ArrayList<InputSplit>(); 074 List<FileStatus> files = listStatus(job); 075 long length = 0; 076 for (FileStatus file : files) { 077 length += file.getLen(); 078 } 079 long splitSize = (long)Math.ceil(length*1.0 / splitNumber); 080 System.out.println("splitSize: " + splitSize); 081 082 for (FileStatus file : files) { 083 Path path = file.getPath(); 084 length = file.getLen(); 085 if (length != 0) { 086 FileSystem fs = path.getFileSystem(job.getConfiguration()); 087 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 088 0, length); 089 if (isSplitable(job, path)) { 090 long bytesRemaining = length; 091 while (((double) bytesRemaining) / splitSize > 1.1) { 092 int blkIndex = getBlockIndex(blkLocations, length 093 - bytesRemaining); 094 splits.add(makeSplit(path, length - bytesRemaining, 095 splitSize, blkLocations[blkIndex].getHosts())); 096 bytesRemaining -= splitSize; 097 } 098 099 if (bytesRemaining != 0) { 100 splits.add(makeSplit(path, length - bytesRemaining, 101 bytesRemaining, 102 blkLocations[blkLocations.length - 1] 103 .getHosts())); 104 } 105 } else { // not splitable 106 splits.add(makeSplit(path, 0, length, 107 blkLocations[0].getHosts())); 108 } 109 } else { 110 // Create empty hosts array for zero length files 111 splits.add(makeSplit(path, 0, length, new String[0])); 112 } 113 } 114 // Save the number of input files for metrics/loadgen 115 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 116 System.out.println("Total # of splits: " + splits.size()); 117 return splits; 118 } 119 120}