001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025
026package org.kepler.bio.hadoop.input;
027
028import java.io.File;
029import java.io.IOException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.io.compress.CompressionCodec;
039import org.apache.hadoop.io.compress.CompressionCodecFactory;
040import org.apache.hadoop.mapreduce.InputSplit;
041import org.apache.hadoop.mapreduce.RecordReader;
042import org.apache.hadoop.mapreduce.TaskAttemptContext;
043import org.apache.hadoop.mapreduce.lib.input.FileSplit;
044
045//////////////////////////////////////////////////////////////////////////
046////FASTARecordReader
047
048/**
049This class treats keys as file paths and value as sequence.
050The class is modified based on class org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
051
052@author Jianwu Wang (jianwu@sdsc.edu)
053@version $Id: FASTARecordReader.java 33064 2014-11-12 23:17:08Z crawl $
054*/
055
056public class FASTARecordReader extends RecordReader<Text, Text> {
057  private static final Log LOG = LogFactory.getLog(FASTARecordReader.class);
058
059  private CompressionCodecFactory compressionCodecs = null;
060  private long start;
061  private long pos;
062  private long end;
063  private FASTAFileReader in;
064  private Text key = null;
065  private Text value = null;
066  private Path file;
067
068
069  public void initialize(InputSplit genericSplit,
070                         TaskAttemptContext context) throws IOException {
071    FileSplit split = (FileSplit) genericSplit;
072    Configuration job = context.getConfiguration();
073    
074    start = split.getStart();
075    end = start + split.getLength();
076    file = split.getPath();
077    compressionCodecs = new CompressionCodecFactory(job);
078    final CompressionCodec codec = compressionCodecs.getCodec(file);
079
080    // open the file and seek to the start of the split
081    FileSystem fs = file.getFileSystem(job);
082    LOG.debug("FileSystem:"+ fs.getName());
083    LOG.debug("file path:"+ file);
084    FSDataInputStream fileIn = fs.open(split.getPath());
085    boolean skipFirstLine = false;
086    if (codec != null) {
087      in = new FASTAFileReader(codec.createInputStream(fileIn), job);
088      end = Long.MAX_VALUE;
089    } else {
090      if (start != 0) {
091        skipFirstLine = true;
092        --start;
093        fileIn.seek(start);
094      }
095      in = new FASTAFileReader(fileIn, job);
096    }
097    if (skipFirstLine) {  // skip first sequence and re-establish "start".
098      start += in.readSequence(new Text(), (int)Math.min((long)Integer.MAX_VALUE, end - start + 1), 0);
099    }
100    this.pos = start;
101  }
102  
103  public boolean nextKeyValue() throws IOException {
104    if (value == null) {
105      value = new Text();
106    }
107    if (key == null) {
108        key = new Text();
109    }
110        String fileName = file.toString();
111          //index = new LongWritable(index.get() + 1);
112    int ind = fileName.lastIndexOf(File.separator);
113    int fileEnd = fileName.lastIndexOf(".");
114    if (fileEnd != -1)
115          fileName = fileName.substring(ind + 1, fileEnd);
116    else 
117          fileName = fileName.substring(ind + 1, fileName.length()); 
118          //return fileName + index.toString();
119        key.set(fileName + "_" + pos);
120      
121    int newSize = 0;
122    while (pos < end) {
123        newSize = in.readSequence(value,
124                (int)Math.min(Integer.MAX_VALUE, end-pos+1), -1);
125      if (newSize == 0) {
126        break;
127      }
128      pos += newSize;
129      break;
130    }
131    if (newSize == 0) {
132      key = null;
133      value = null;
134      System.out.println("no key and value in nextKeyValue()");
135      return false;
136    } else {
137        LOG.debug("get key in nextKeyValue():"+ key);
138        LOG.debug("get value in nextKeyValue():"+ value.toString().substring(0, value.toString().length() > 200 ? 200 : value.toString().length()));
139      return true;
140    }
141  }
142  
143  @Override
144  public Text getCurrentKey() {
145          return key;
146  }
147
148  @Override
149  public Text getCurrentValue() {
150    return value;
151  }
152
153  /**
154   * Get the progress within the split
155   */
156  public float getProgress() {
157    if (start == end) {
158      return 0.0f;
159    } else {
160      return Math.min(1.0f, (pos - start) / (float)(end - start));
161    }
162  }
163  
164  public synchronized void close() throws IOException {
165    if (in != null) {
166      in.close(); 
167    }
168  }
169  
170//  public static void main (String args[]){
171//        try {
172////            InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/match_10_42_dehenry_1zm1_11_3.pdb"));
173//                InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/conf/capacity-scheduler.xml"));
174//              MultipleLineReader reader = new MultipleLineReader(inputStream);
175//              Text value = new Text();
176//              for (int i=0; i<100; i++){
177//              reader.readLine(value);
178//              System.out.println(value.toString().split("\n").length);
179//              value = new Text();
180//              }
181////            reader.readLine(value, maxLineLength,
182////                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
183////                         maxLineLength));
184////            reader.readLine(value, Integer.MAX_VALUE);
185////            System.out.println("string in MultipleLineReader");
186//              
187//              
188//      } catch (Exception e) {
189//              // TODO Auto-generated catch block
190//              e.printStackTrace();
191//      }
192//  }
193}
194