001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025 026package org.kepler.hadoop.io.input; 027 028import java.io.IOException; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.LongWritable; 037import org.apache.hadoop.io.Text; 038import org.apache.hadoop.io.compress.CompressionCodec; 039import org.apache.hadoop.io.compress.CompressionCodecFactory; 040import org.apache.hadoop.mapreduce.InputSplit; 041import org.apache.hadoop.mapreduce.RecordReader; 042import org.apache.hadoop.mapreduce.TaskAttemptContext; 043import org.apache.hadoop.mapreduce.lib.input.FileSplit; 044 045/** 046This class treats keys as file paths and value as sequence. 047The class is modified based on class org.apache.hadoop.mapreduce.lib.input.LineRecordReader. 048 049@author Jianwu Wang (jianwu@sdsc.edu) 050@version $Id: DelimitedTextRecordReader.java 33862 2015-09-04 16:53:05Z crawl $ 051*/ 052 053public class DelimitedTextRecordReader extends RecordReader<LongWritable, Text> { 054 private static final Log LOG = LogFactory.getLog(DelimitedTextRecordReader.class); 055 056 private CompressionCodecFactory compressionCodecs = null; 057 private long start; 058 private long pos; 059 private long end; 060 private DelimitedTextFileReader in; 061 private int maxLineLength; 062 private Text value = null; 063 private Path file; 064 065 066 @Override 067 public void initialize(InputSplit genericSplit, 068 TaskAttemptContext context) throws IOException { 069 FileSplit split = (FileSplit) genericSplit; 070 Configuration job = context.getConfiguration(); 071 this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 072 Integer.MAX_VALUE); 073 start = split.getStart(); 074 end = start + split.getLength(); 075 file = split.getPath(); 076 compressionCodecs = new CompressionCodecFactory(job); 077 final CompressionCodec codec = compressionCodecs.getCodec(file); 078 079 // open the file and seek to the start of the split 080 FileSystem fs = file.getFileSystem(job); 081 //System.out.println("FileSystem:"+ fs.getName()); 082 //System.out.println("file path:"+ file); 083 FSDataInputStream fileIn = fs.open(split.getPath()); 084 boolean skipFirstLine = false; 085 if (codec != null) { 086 in = new DelimitedTextFileReader(codec.createInputStream(fileIn), job); 087 end = Long.MAX_VALUE; 088 } else { 089 if (start != 0) { 090 skipFirstLine = true; 091 --start; 092 fileIn.seek(start); 093 } 094 in = new DelimitedTextFileReader(fileIn, job); 095 } 096 if (skipFirstLine) { // skip first line and re-establish "start". 097 start += in.readLine(new Text(), 0, 098 (int)Math.min((long)Integer.MAX_VALUE, end - start)); 099 } 100 this.pos = start; 101 } 102 103 @Override 104public boolean nextKeyValue() throws IOException { 105 if (value == null) { 106 value = new Text(); 107 } 108 int newSize = 0; 109 while (pos < end) { 110 newSize = in.readLine(value, maxLineLength, 111 Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 112 maxLineLength)); 113 if (newSize == 0) { 114 break; 115 } 116 pos += newSize; 117 if (newSize < maxLineLength) { 118 break; 119 } 120 121 // line too long. try again 122 LOG.info("Skipped line of size " + newSize + " at pos " + 123 (pos - newSize)); 124 } 125 if (newSize == 0) { 126 value = null; 127 return false; 128 } else { 129 return true; 130 } 131 } 132 133 @Override 134 public LongWritable getCurrentKey() { 135 return new LongWritable(pos); 136 } 137 138// public Text getCurrentKey() { 139// String fileName = file.toString(); 140// //index = new LongWritable(index.get() + 1); 141// int ind = fileName.lastIndexOf(File.separator); 142// fileName = fileName.substring(ind + 1, fileName.length()); 143// //return fileName + index.toString(); 144// return new Text(fileName + "_" + pos); 145// } 146 147 @Override 148 public Text getCurrentValue() { 149 return value; 150 } 151 152 /** 153 * Get the progress within the split 154 */ 155 @Override 156public float getProgress() { 157 if (start == end) { 158 return 0.0f; 159 } else { 160 return Math.min(1.0f, (pos - start) / (float)(end - start)); 161 } 162 } 163 164 @Override 165public synchronized void close() throws IOException { 166 if (in != null) { 167 in.close(); 168 } 169 } 170} 171