001/* FileRecordReader for DataFileInputFormat
002
003/*
004 * Copyright (c) 2010-2013 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.hadoop.io.input;
028
029import java.io.IOException;
030
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.Text;
035import org.apache.hadoop.mapreduce.InputSplit;
036import org.apache.hadoop.mapreduce.RecordReader;
037import org.apache.hadoop.mapreduce.TaskAttemptContext;
038import org.apache.hadoop.mapreduce.lib.input.FileSplit;
039
040//////////////////////////////////////////////////////////////////////////
041////FilePathInputFormat
042
043/**
044* This class provides InputFormat to read files. 
045* Key is file name (no path info), value is file content.
046* 
047* @author Jianwu Wang (jianwu@sdsc.edu)
048* @version $Id: FileContentRecordReader.java 33070 2014-11-12 23:21:09Z crawl $
049*/
050public class FileContentRecordReader extends RecordReader<String, Text> {
051        private Path path;
052        private FSDataInputStream fis;
053        private FileSplit fileSplit;
054        private boolean done = false;
055        private Text value = null;
056
057        @Override
058        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
059                        InterruptedException {
060                path = ((FileSplit) split).getPath();
061                fileSplit = (FileSplit) split;
062            FileSystem fs = path.getFileSystem(context.getConfiguration());
063                fis = fs.open(path);
064        }
065
066        @Override
067        public float getProgress() throws IOException {
068                //System.out.println("in getProgress : " + done);
069                if (done) {
070                        return 1.0f;
071                } else {
072                        return 0.0f;
073                }
074        }
075
076        @Override
077        public String getCurrentKey() throws IOException, InterruptedException {
078                System.out.println("in current key " + path.toString() + " :" + done);
079                // if (done){
080                // return null;
081                // }else{
082                String pathName = path.getName();
083                int index = pathName.lastIndexOf("/");
084                return pathName.substring(index + 1, pathName.length());
085                // }
086        }
087
088        @Override
089        public Text getCurrentValue() throws IOException, InterruptedException {
090                return value;
091        }
092
093        @Override
094        public boolean nextKeyValue() throws IOException, InterruptedException {
095                if (done)
096                        return false;
097                else {
098                        byte[] fileByte = new byte[(int)fileSplit.getLength()]; 
099                        fis.readFully(fileByte);
100                        System.out.println("file path " + path.toString());
101                        System.out.println("file length " + fileSplit.getLength());
102                        value = new Text(fileByte);
103                        done = true;
104                        return true;
105                }
106        }
107
108        @Override
109        public void close() throws IOException {
110                if (fis != null)
111                        fis.close();
112                done = true;
113        }
114}