001/*
002 * Copyright (c) 2010-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * Permission is hereby granted, without written agreement and without
006 * license or royalty fees, to use, copy, modify, and distribute this
007 * software and its documentation for any purpose, provided that the above
008 * copyright notice and the following two paragraphs appear in all copies
009 * of this software.
010 *
011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 * SUCH DAMAGE.
016 *
017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 * ENHANCEMENTS, OR MODIFICATIONS.
023 *
024 */
025
026package org.kepler.hadoop.io.input;
027
028import java.io.IOException;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.LongWritable;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.io.compress.CompressionCodec;
039import org.apache.hadoop.io.compress.CompressionCodecFactory;
040import org.apache.hadoop.mapreduce.InputSplit;
041import org.apache.hadoop.mapreduce.RecordReader;
042import org.apache.hadoop.mapreduce.TaskAttemptContext;
043import org.apache.hadoop.mapreduce.lib.input.FileSplit;
044
045/**
046This class treats keys as file paths and value as sequence.
047The class is modified based on class org.apache.hadoop.mapreduce.lib.input.LineRecordReader.
048
049@author Jianwu Wang (jianwu@sdsc.edu)
050@version $Id: DelimitedTextRecordReader.java 33862 2015-09-04 16:53:05Z crawl $
051*/
052
053public class DelimitedTextRecordReader extends RecordReader<LongWritable, Text> {
054  private static final Log LOG = LogFactory.getLog(DelimitedTextRecordReader.class);
055
056  private CompressionCodecFactory compressionCodecs = null;
057  private long start;
058  private long pos;
059  private long end;
060  private DelimitedTextFileReader in;
061  private int maxLineLength;
062  private Text value = null;
063  private Path file;
064
065
066  @Override
067  public void initialize(InputSplit genericSplit,
068                         TaskAttemptContext context) throws IOException {
069    FileSplit split = (FileSplit) genericSplit;
070    Configuration job = context.getConfiguration();
071    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
072                                    Integer.MAX_VALUE);
073    start = split.getStart();
074    end = start + split.getLength();
075    file = split.getPath();
076    compressionCodecs = new CompressionCodecFactory(job);
077    final CompressionCodec codec = compressionCodecs.getCodec(file);
078
079    // open the file and seek to the start of the split
080    FileSystem fs = file.getFileSystem(job);
081    //System.out.println("FileSystem:"+ fs.getName());
082    //System.out.println("file path:"+ file);
083    FSDataInputStream fileIn = fs.open(split.getPath());
084    boolean skipFirstLine = false;
085    if (codec != null) {
086      in = new DelimitedTextFileReader(codec.createInputStream(fileIn), job);
087      end = Long.MAX_VALUE;
088    } else {
089      if (start != 0) {
090        skipFirstLine = true;
091        --start;
092        fileIn.seek(start);
093      }
094      in = new DelimitedTextFileReader(fileIn, job);
095    }
096    if (skipFirstLine) {  // skip first line and re-establish "start".
097      start += in.readLine(new Text(), 0,
098                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
099    }
100    this.pos = start;
101  }
102  
103  @Override
104public boolean nextKeyValue() throws IOException {
105    if (value == null) {
106      value = new Text();
107    }
108    int newSize = 0;
109    while (pos < end) {
110      newSize = in.readLine(value, maxLineLength,
111                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
112                                     maxLineLength));
113      if (newSize == 0) {
114        break;
115      }
116      pos += newSize;
117      if (newSize < maxLineLength) {
118        break;
119      }
120      
121      // line too long. try again
122      LOG.info("Skipped line of size " + newSize + " at pos " + 
123               (pos - newSize));
124    }
125    if (newSize == 0) {
126      value = null;
127      return false;
128    } else {
129      return true;
130    }
131  }
132
133  @Override
134  public LongWritable getCurrentKey() {
135    return new LongWritable(pos);
136  }
137  
138//  public Text getCurrentKey() {
139//        String fileName = file.toString();
140//        //index = new LongWritable(index.get() + 1);
141//      int ind = fileName.lastIndexOf(File.separator);
142//        fileName = fileName.substring(ind + 1, fileName.length());
143//        //return fileName + index.toString();
144//        return new Text(fileName + "_" + pos);
145//  }
146
147  @Override
148  public Text getCurrentValue() {
149    return value;
150  }
151
152  /**
153   * Get the progress within the split
154   */
155  @Override
156public float getProgress() {
157    if (start == end) {
158      return 0.0f;
159    } else {
160      return Math.min(1.0f, (pos - start) / (float)(end - start));
161    }
162  }
163  
164  @Override
165public synchronized void close() throws IOException {
166    if (in != null) {
167      in.close(); 
168    }
169  }
170}
171