001/* 002 * Copyright (c) 2010-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * Permission is hereby granted, without written agreement and without 006 * license or royalty fees, to use, copy, modify, and distribute this 007 * software and its documentation for any purpose, provided that the above 008 * copyright notice and the following two paragraphs appear in all copies 009 * of this software. 010 * 011 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 * SUCH DAMAGE. 016 * 017 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 * ENHANCEMENTS, OR MODIFICATIONS. 023 * 024 */ 025 026package org.kepler.hadoop.io.input; 027 028import java.io.File; 029import java.io.IOException; 030 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.io.LongWritable; 038import org.apache.hadoop.io.Text; 039import org.apache.hadoop.io.compress.CompressionCodec; 040import org.apache.hadoop.io.compress.CompressionCodecFactory; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.RecordReader; 043import org.apache.hadoop.mapreduce.TaskAttemptContext; 044import org.apache.hadoop.mapreduce.lib.input.FileSplit; 045 046////////////////////////////////////////////////////////////////////////// 047////MultiLineRecordReader 048 049/** 050 * This class treats keys as offset in file and value as line. 051 * The class is modified based on class org.apache.hadoop.mapreduce.lib.input.LineRecordReader. 052 * 053 * @author Jianwu Wang (jianwu@sdsc.edu) 054 * @version $Id: MultiLineRecordReader.java 33862 2015-09-04 16:53:05Z crawl $ 055*/ 056 057public class MultiLineRecordReader extends RecordReader<Text, Text> { 058 private static final Log LOG = LogFactory.getLog(MultiLineRecordReader.class); 059 060 private CompressionCodecFactory compressionCodecs = null; 061 private long start; 062 private long pos; 063 private long end; 064 private MultipleLineReader in; 065 private int maxLineLength; 066 private LongWritable key = null; 067 private LongWritable index = null; 068 private Text value = null; 069 private Path file; 070 071 072 @Override 073 public void initialize(InputSplit genericSplit, 074 TaskAttemptContext context) throws IOException { 075 FileSplit split = (FileSplit) genericSplit; 076 Configuration job = context.getConfiguration(); 077 this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 078 Integer.MAX_VALUE); 079 start = split.getStart(); 080 end = start + split.getLength(); 081 file = split.getPath(); 082 compressionCodecs = new CompressionCodecFactory(job); 083 final CompressionCodec codec = compressionCodecs.getCodec(file); 084 085 // open the file and seek to the start of the split 086 FileSystem fs = file.getFileSystem(job); 087 //System.out.println("FileSystem:"+ fs.getName()); 088 //System.out.println("file path:"+ file); 089 FSDataInputStream fileIn = fs.open(split.getPath()); 090 boolean skipFirstLine = false; 091 if (codec != null) { 092 in = new MultipleLineReader(codec.createInputStream(fileIn), job); 093 end = Long.MAX_VALUE; 094 } else { 095 if (start != 0) { 096 skipFirstLine = true; 097 --start; 098 fileIn.seek(start); 099 } 100 in = new MultipleLineReader(fileIn, job); 101 } 102 if (skipFirstLine) { // skip first line and re-establish "start". 103 start += in.readLine(new Text(), 0, 104 (int)Math.min((long)Integer.MAX_VALUE, end - start)); 105 } 106 this.pos = start; 107 index = new LongWritable(0); 108 } 109 110 @Override 111 public boolean nextKeyValue() throws IOException { 112 if (key == null) { 113 key = new LongWritable(); 114 } 115 key.set(pos); 116 if (value == null) { 117 value = new Text(); 118 } 119 int newSize = 0; 120 while (pos < end) { 121 newSize = in.readLine(value, maxLineLength, 122 Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 123 maxLineLength)); 124 if (newSize == 0) { 125 break; 126 } 127 pos += newSize; 128 if (newSize < maxLineLength) { 129 break; 130 } 131 132 // line too long. try again 133 LOG.info("Skipped line of size " + newSize + " at pos " + 134 (pos - newSize)); 135 } 136 if (newSize == 0) { 137 key = null; 138 value = null; 139 return false; 140 } else { 141 return true; 142 } 143 } 144 145 @Override 146// public LongWritable getCurrentKey() { 147// return key; 148// } 149 150 public Text getCurrentKey() { 151 String fileName = file.toString(); 152 //index = new LongWritable(index.get() + 1); 153 int ind = fileName.lastIndexOf(File.separator); 154 fileName = fileName.substring(ind + 1, fileName.length()); 155 //return fileName + index.toString(); 156 return new Text(fileName); 157 } 158 159 @Override 160 public Text getCurrentValue() { 161 return value; 162 } 163 164 /** 165 * Get the progress within the split 166 */ 167 @Override 168public float getProgress() { 169 if (start == end) { 170 return 0.0f; 171 } else { 172 return Math.min(1.0f, (pos - start) / (float)(end - start)); 173 } 174 } 175 176 @Override 177public synchronized void close() throws IOException { 178 if (in != null) { 179 in.close(); 180 } 181 } 182 183// public static void main (String args[]){ 184// try { 185//// InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/match_10_42_dehenry_1zm1_11_3.pdb")); 186// InputStream inputStream = new BufferedInputStream (new FileInputStream("/Users/jianwu/Projects/hadoop/hadoop-0.20.0/conf/capacity-scheduler.xml")); 187// MultipleLineReader reader = new MultipleLineReader(inputStream); 188// Text value = new Text(); 189// for (int i=0; i<100; i++){ 190// reader.readLine(value); 191// System.out.println(value.toString().split("\n").length); 192// value = new Text(); 193// } 194//// reader.readLine(value, maxLineLength, 195//// Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), 196//// maxLineLength)); 197//// reader.readLine(value, Integer.MAX_VALUE); 198//// System.out.println("string in MultipleLineReader"); 199// 200// 201// } catch (Exception e) { 202// // TODO Auto-generated catch block 203// e.printStackTrace(); 204// } 205// } 206} 207