001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025
026package org.kepler.hadoop.io.input;
027
028import java.io.File;
029import java.io.IOException;
030
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.io.LongWritable;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.io.compress.CompressionCodec;
040import org.apache.hadoop.io.compress.CompressionCodecFactory;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.RecordReader;
043import org.apache.hadoop.mapreduce.TaskAttemptContext;
044import org.apache.hadoop.mapreduce.lib.input.FileSplit;
045
046//////////////////////////////////////////////////////////////////////////
047////MultiLineRecordReader
048
049/**
050 * This class treats keys as offset in file and value as line.
051 * The class is modified based on class org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
052 * 
053 * @author Jianwu Wang (jianwu@sdsc.edu)
054 * @version $Id: MultiLineRecordReader.java 33862 2015-09-04 16:53:05Z crawl $
055*/
056
057public class MultiLineRecordReader extends RecordReader<Text, Text> {
058  private static final Log LOG = LogFactory.getLog(MultiLineRecordReader.class);
059
060  private CompressionCodecFactory compressionCodecs = null;
061  private long start;
062  private long pos;
063  private long end;
064  private MultipleLineReader in;
065  private int maxLineLength;
066  private LongWritable key = null;
067  private LongWritable index = null;
068  private Text value = null;
069  private Path file;
070
071
072  @Override
073  public void initialize(InputSplit genericSplit,
074                         TaskAttemptContext context) throws IOException {
075    FileSplit split = (FileSplit) genericSplit;
076    Configuration job = context.getConfiguration();
077    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
078                                    Integer.MAX_VALUE);
079    start = split.getStart();
080    end = start + split.getLength();
081    file = split.getPath();
082    compressionCodecs = new CompressionCodecFactory(job);
083    final CompressionCodec codec = compressionCodecs.getCodec(file);
084
085    // open the file and seek to the start of the split
086    FileSystem fs = file.getFileSystem(job);
087    //System.out.println("FileSystem:"+ fs.getName());
088    //System.out.println("file path:"+ file);
089    FSDataInputStream fileIn = fs.open(split.getPath());
090    boolean skipFirstLine = false;
091    if (codec != null) {
092      in = new MultipleLineReader(codec.createInputStream(fileIn), job);
093      end = Long.MAX_VALUE;
094    } else {
095      if (start != 0) {
096        skipFirstLine = true;
097        --start;
098        fileIn.seek(start);
099      }
100      in = new MultipleLineReader(fileIn, job);
101    }
102    if (skipFirstLine) {  // skip first line and re-establish "start".
103      start += in.readLine(new Text(), 0,
104                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
105    }
106    this.pos = start;
107    index = new LongWritable(0);
108  }
109  
110  @Override
111  public boolean nextKeyValue() throws IOException {
112    if (key == null) {
113      key = new LongWritable();
114    }
115    key.set(pos);
116    if (value == null) {
117      value = new Text();
118    }
119    int newSize = 0;
120    while (pos < end) {
121      newSize = in.readLine(value, maxLineLength,
122                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
123                                     maxLineLength));
124      if (newSize == 0) {
125        break;
126      }
127      pos += newSize;
128      if (newSize < maxLineLength) {
129        break;
130      }
131      
132      // line too long. try again
133      LOG.info("Skipped line of size " + newSize + " at pos " + 
134               (pos - newSize));
135    }
136    if (newSize == 0) {
137      key = null;
138      value = null;
139      return false;
140    } else {
141      return true;
142    }
143  }
144
145  @Override
146//  public LongWritable getCurrentKey() {
147//    return key;
148//  }
149  
150  public Text getCurrentKey() {
151          String fileName = file.toString();
152          //index = new LongWritable(index.get() + 1);
153      int ind = fileName.lastIndexOf(File.separator);
154          fileName = fileName.substring(ind + 1, fileName.length());
155          //return fileName + index.toString();
156          return new Text(fileName);
157  }
158
159  @Override
160  public Text getCurrentValue() {
161    return value;
162  }
163
164  /**
165   * Get the progress within the split
166   */
167  @Override
168public float getProgress() {
169    if (start == end) {
170      return 0.0f;
171    } else {
172      return Math.min(1.0f, (pos - start) / (float)(end - start));
173    }
174  }
175  
176  @Override
177public synchronized void close() throws IOException {
178    if (in != null) {
179      in.close(); 
180    }
181  }
182  
183//  public static void main (String args[]){
184//        try {
185////            InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/match_10_42_dehenry_1zm1_11_3.pdb"));
186//                InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/conf/capacity-scheduler.xml"));
187//              MultipleLineReader reader = new MultipleLineReader(inputStream);
188//              Text value = new Text();
189//              for (int i=0; i<100; i++){
190//              reader.readLine(value);
191//              System.out.println(value.toString().split("\n").length);
192//              value = new Text();
193//              }
194////            reader.readLine(value, maxLineLength,
195////                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
196////                         maxLineLength));
197////            reader.readLine(value, Integer.MAX_VALUE);
198////            System.out.println("string in MultipleLineReader");
199//              
200//              
201//      } catch (Exception e) {
202//              // TODO Auto-generated catch block
203//              e.printStackTrace();
204//      }
205//  }
206}
207