001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025 026package org.kepler.bio.hadoop.input; 027 028import java.io.File; 029import java.io.IOException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.io.compress.CompressionCodec; 039import org.apache.hadoop.io.compress.CompressionCodecFactory; 040import org.apache.hadoop.mapreduce.InputSplit; 041import org.apache.hadoop.mapreduce.RecordReader; 042import org.apache.hadoop.mapreduce.TaskAttemptContext; 043import org.apache.hadoop.mapreduce.lib.input.FileSplit; 044 045////////////////////////////////////////////////////////////////////////// 046////FASTARecordReader 047 048/** 049This class treats keys as file paths and value as sequence. 050The class is modified based on class org.apache.hadoop.mapreduce.lib.input.LineRecordReader. 051 052@author Jianwu Wang (jianwu@sdsc.edu) 053@version $Id: FASTARecordReader.java 33064 2014-11-12 23:17:08Z crawl $ 054*/ 055 056public class FASTARecordReader extends RecordReader<Text, Text> { 057 private static final Log LOG = LogFactory.getLog(FASTARecordReader.class); 058 059 private CompressionCodecFactory compressionCodecs = null; 060 private long start; 061 private long pos; 062 private long end; 063 private FASTAFileReader in; 064 private Text key = null; 065 private Text value = null; 066 private Path file; 067 068 069 public void initialize(InputSplit genericSplit, 070 TaskAttemptContext context) throws IOException { 071 FileSplit split = (FileSplit) genericSplit; 072 Configuration job = context.getConfiguration(); 073 074 start = split.getStart(); 075 end = start + split.getLength(); 076 file = split.getPath(); 077 compressionCodecs = new CompressionCodecFactory(job); 078 final CompressionCodec codec = compressionCodecs.getCodec(file); 079 080 // open the file and seek to the start of the split 081 FileSystem fs = file.getFileSystem(job); 082 LOG.debug("FileSystem:"+ fs.getName()); 083 LOG.debug("file path:"+ file); 084 FSDataInputStream fileIn = fs.open(split.getPath()); 085 boolean skipFirstLine = false; 086 if (codec != null) { 087 in = new FASTAFileReader(codec.createInputStream(fileIn), job); 088 end = Long.MAX_VALUE; 089 } else { 090 if (start != 0) { 091 skipFirstLine = true; 092 --start; 093 fileIn.seek(start); 094 } 095 in = new FASTAFileReader(fileIn, job); 096 } 097 if (skipFirstLine) { // skip first sequence and re-establish "start". 098 start += in.readSequence(new Text(), (int)Math.min((long)Integer.MAX_VALUE, end - start + 1), 0); 099 } 100 this.pos = start; 101 } 102 103 public boolean nextKeyValue() throws IOException { 104 if (value == null) { 105 value = new Text(); 106 } 107 if (key == null) { 108 key = new Text(); 109 } 110 String fileName = file.toString(); 111 //index = new LongWritable(index.get() + 1); 112 int ind = fileName.lastIndexOf(File.separator); 113 int fileEnd = fileName.lastIndexOf("."); 114 if (fileEnd != -1) 115 fileName = fileName.substring(ind + 1, fileEnd); 116 else 117 fileName = fileName.substring(ind + 1, fileName.length()); 118 //return fileName + index.toString(); 119 key.set(fileName + "_" + pos); 120 121 int newSize = 0; 122 while (pos < end) { 123 newSize = in.readSequence(value, 124 (int)Math.min(Integer.MAX_VALUE, end-pos+1), -1); 125 if (newSize == 0) { 126 break; 127 } 128 pos += newSize; 129 break; 130 } 131 if (newSize == 0) { 132 key = null; 133 value = null; 134 System.out.println("no key and value in nextKeyValue()"); 135 return false; 136 } else { 137 LOG.debug("get key in nextKeyValue():"+ key); 138 LOG.debug("get value in nextKeyValue():"+ value.toString().substring(0, value.toString().length() > 200 ? 200 : value.toString().length())); 139 return true; 140 } 141 } 142 143 @Override 144 public Text getCurrentKey() { 145 return key; 146 } 147 148 @Override 149 public Text getCurrentValue() { 150 return value; 151 } 152 153 /** 154 * Get the progress within the split 155 */ 156 public float getProgress() { 157 if (start == end) { 158 return 0.0f; 159 } else { 160 return Math.min(1.0f, (pos - start) / (float)(end - start)); 161 } 162 } 163 164 public synchronized void close() throws IOException { 165 if (in != null) { 166 in.close(); 167 } 168 } 169 170// public static void main (String args[]){ 171// try { 172//// InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/match_10_42_dehenry_1zm1_11_3.pdb")); 173// InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/conf/capacity-scheduler.xml")); 174// MultipleLineReader reader = new MultipleLineReader(inputStream); 175// Text value = new Text(); 176// for (int i=0; i<100; i++){ 177// reader.readLine(value); 178// System.out.println(value.toString().split("\n").length); 179// value = new Text(); 180// } 181//// reader.readLine(value, maxLineLength, 182//// Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 183//// maxLineLength)); 184//// reader.readLine(value, Integer.MAX_VALUE); 185//// System.out.println("string in MultipleLineReader"); 186// 187// 188// } catch (Exception e) { 189// // TODO Auto-generated catch block 190// e.printStackTrace(); 191// } 192// } 193} 194