001/* FileRecordReader for DataFileInputFormat 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.hadoop.io.input; 028 029import java.io.IOException; 030 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.Text; 035import org.apache.hadoop.mapreduce.InputSplit; 036import org.apache.hadoop.mapreduce.RecordReader; 037import org.apache.hadoop.mapreduce.TaskAttemptContext; 038import org.apache.hadoop.mapreduce.lib.input.FileSplit; 039 040////////////////////////////////////////////////////////////////////////// 041////FilePathInputFormat 042 043/** 044* This class provides InputFormat to read files. 045* Key is file name (no path info), value is file content. 046* 047* @author Jianwu Wang (jianwu@sdsc.edu) 048* @version $Id: FileContentRecordReader.java 33070 2014-11-12 23:21:09Z crawl $ 049*/ 050public class FileContentRecordReader extends RecordReader<String, Text> { 051 private Path path; 052 private FSDataInputStream fis; 053 private FileSplit fileSplit; 054 private boolean done = false; 055 private Text value = null; 056 057 @Override 058 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 059 InterruptedException { 060 path = ((FileSplit) split).getPath(); 061 fileSplit = (FileSplit) split; 062 FileSystem fs = path.getFileSystem(context.getConfiguration()); 063 fis = fs.open(path); 064 } 065 066 @Override 067 public float getProgress() throws IOException { 068 //System.out.println("in getProgress : " + done); 069 if (done) { 070 return 1.0f; 071 } else { 072 return 0.0f; 073 } 074 } 075 076 @Override 077 public String getCurrentKey() throws IOException, InterruptedException { 078 System.out.println("in current key " + path.toString() + " :" + done); 079 // if (done){ 080 // return null; 081 // }else{ 082 String pathName = path.getName(); 083 int index = pathName.lastIndexOf("/"); 084 return pathName.substring(index + 1, pathName.length()); 085 // } 086 } 087 088 @Override 089 public Text getCurrentValue() throws IOException, InterruptedException { 090 return value; 091 } 092 093 @Override 094 public boolean nextKeyValue() throws IOException, InterruptedException { 095 if (done) 096 return false; 097 else { 098 byte[] fileByte = new byte[(int)fileSplit.getLength()]; 099 fis.readFully(fileByte); 100 System.out.println("file path " + path.toString()); 101 System.out.println("file length " + fileSplit.getLength()); 102 value = new Text(fileByte); 103 done = true; 104 return true; 105 } 106 } 107 108 @Override 109 public void close() throws IOException { 110 if (fis != null) 111 fis.close(); 112 done = true; 113 } 114}