001/* DuplicatedTextRecordReader for DuplicatedTextInputFormat 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.hadoop.io.input; 028 029import java.io.IOException; 030 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.IntWritable; 035import org.apache.hadoop.io.Text; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.RecordReader; 038import org.apache.hadoop.mapreduce.TaskAttemptContext; 039import org.apache.hadoop.mapreduce.lib.input.FileSplit; 040 041////////////////////////////////////////////////////////////////////////// 042////DuplicatedTextRecordReader 043 044/** 045* This class provides Reader to read file(s) repeatedly. 046* Key is the current index value, value is the whole content of the file. 047* The whole content of the file will be sent to value repeatedly unless key number reaches parallel number. 048* 049* @author Jianwu Wang (jianwu@sdsc.edu) 050* @version $Id: DuplicatedTextRecordReader.java 33070 2014-11-12 23:21:09Z crawl $ 051*/ 052public class DuplicatedTextRecordReader extends RecordReader<IntWritable, Text> { 053 private Path path; 054 private FSDataInputStream fis; 055 private FileSplit fileSplit; 056 private boolean done = false; 057 private Text value = null; 058 private int dupIndex; 059 private int paraNum = 1; 060 061 @Override 062 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 063 InterruptedException { 064 path = ((FileSplit) split).getPath(); 065 fileSplit = (FileSplit) split; 066 FileSystem fs = path.getFileSystem(context.getConfiguration()); 067 fis = fs.open(path); 068 dupIndex = 0; 069 } 070 071 @Override 072 public float getProgress() throws IOException { 073 if (done) { 074 return 1.0f; 075 } else { 076 return new Float(dupIndex*1.0/paraNum); 077 } 078 } 079 080 @Override 081 public IntWritable getCurrentKey() throws IOException, InterruptedException { 082 //System.out.println("in current key " + path.toString() + " :" + done); 083 return new IntWritable(dupIndex); 084 } 085 086 @Override 087 public Text getCurrentValue() throws IOException, InterruptedException { 088 return value; 089 } 090 091 @Override 092 public boolean nextKeyValue() throws IOException, InterruptedException { 093 if (dupIndex >= paraNum) 094 return false; 095 else { 096 byte[] fileByte = new byte[(int)fileSplit.getLength()]; 097 fis.seek(0); // reset offset to be zero since the reader read the whole content repeatedly. 098 fis.readFully(fileByte); 099 value = new Text(fileByte); 100 dupIndex++; 101 return true; 102 } 103 } 104 105 @Override 106 public void close() throws IOException { 107 if (fis != null) 108 fis.close(); 109 done = true; 110 } 111 112 public void setParaNum(int paraNum) throws IOException { 113 this.paraNum = paraNum; 114 } 115}