001/* DuplicatedTextRecordReader for DuplicatedTextInputFormat
002
003/*
004 * Copyright (c) 2010-2013 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.hadoop.io.input;
028
029import java.io.IOException;
030
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.IntWritable;
035import org.apache.hadoop.io.Text;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.RecordReader;
038import org.apache.hadoop.mapreduce.TaskAttemptContext;
039import org.apache.hadoop.mapreduce.lib.input.FileSplit;
040
041//////////////////////////////////////////////////////////////////////////
042////DuplicatedTextRecordReader
043
044/**
045* This class provides Reader to read file(s) repeatedly. 
046* Key is the current index value, value is the whole content of the file.
047* The whole content of the file will be sent to value repeatedly unless key number reaches parallel number. 
048* 
049* @author Jianwu Wang (jianwu@sdsc.edu)
050* @version $Id: DuplicatedTextRecordReader.java 33070 2014-11-12 23:21:09Z crawl $
051*/
052public class DuplicatedTextRecordReader extends RecordReader<IntWritable, Text> {
053        private Path path;
054        private FSDataInputStream fis;
055        private FileSplit fileSplit;
056        private boolean done = false;
057        private Text value = null;
058        private int dupIndex;
059        private int paraNum = 1;
060
061        @Override
062        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
063                        InterruptedException {
064                path = ((FileSplit) split).getPath();
065                fileSplit = (FileSplit) split;
066            FileSystem fs = path.getFileSystem(context.getConfiguration());
067                fis = fs.open(path);
068                dupIndex = 0;
069        }
070
071        @Override
072        public float getProgress() throws IOException {
073                if (done) {
074                        return 1.0f;
075                } else {
076                        return new Float(dupIndex*1.0/paraNum);
077                }
078        }
079
080        @Override
081        public IntWritable getCurrentKey() throws IOException, InterruptedException {
082                //System.out.println("in current key " + path.toString() + " :" + done);
083                return new IntWritable(dupIndex);
084        }
085
086        @Override
087        public Text getCurrentValue() throws IOException, InterruptedException {
088                return value;
089        }
090
091        @Override
092        public boolean nextKeyValue() throws IOException, InterruptedException {
093                if (dupIndex >= paraNum)
094                        return false;
095                else {
096                        byte[] fileByte = new byte[(int)fileSplit.getLength()];
097                        fis.seek(0); // reset offset to be zero since the reader read the whole content repeatedly.
098                        fis.readFully(fileByte);
099                        value = new Text(fileByte);
100                        dupIndex++;
101                        return true;
102                }
103        }
104
105        @Override
106        public void close() throws IOException {
107                if (fis != null)
108                        fis.close();
109                done = true;
110        }
111        
112        public void setParaNum(int paraNum) throws IOException {
113                this.paraNum = paraNum;
114        }
115}