001/* FileRecordReader for DataFileInputFormat 002 003/* 004 * Copyright (c) 2010-2013 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.hadoop.io.input; 028 029import java.io.File; 030import java.io.IOException; 031 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.io.Text; 034import org.apache.hadoop.mapreduce.InputSplit; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.mapreduce.lib.input.FileSplit; 038 039////////////////////////////////////////////////////////////////////////// 040////FilePathRecordReader 041 042/** 043* This class provides RecordReader to read file paths. 044* Key is file name (no path info), value is file path. 045* This record only list each file once, even for 046* large files that are automatically partitioned on HDFS. 047* 048* 049* @author Jianwu Wang (jianwu@sdsc.edu) 050* @version $Id: FilePathRecordReader.java 33070 2014-11-12 23:21:09Z crawl $ 051*/ 052 053public class FilePathRecordReader extends RecordReader<Text, Text> { 054 private Path path; 055 // fs never used 056 //private FileSystem fs; 057 private boolean done = false; 058 private long start = -1; 059 060 @Override 061 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, 062 InterruptedException { 063 path = ((FileSplit) split).getPath(); 064 start = ((FileSplit) split).getStart(); 065 } 066 067 @Override 068 public float getProgress() throws IOException { 069 //System.out.println("in getProgress : " + done); 070 if (done) { 071 return 1.0f; 072 } else { 073 return 0.0f; 074 } 075 } 076 077 @Override 078 public Text getCurrentKey() throws IOException, InterruptedException { 079 //System.out.println("in current key " + path.toString() + " :" + done); 080 // if (done){ 081 // return null; 082 // }else{ 083 String pathName = path.getName(); 084 int index = pathName.lastIndexOf(File.separator); 085 return new Text(pathName.substring(index + 1, pathName.length())); 086 // } 087 } 088 089 @Override 090 public Text getCurrentValue() throws IOException, InterruptedException { 091 //System.out.println(" get Current Value " + path.toString() + " :" + done); 092 // if (done){ 093 // return null; 094 // }else{ 095 return new Text(path.toString()); 096 // } 097 } 098 099 @Override 100 public boolean nextKeyValue() throws IOException, InterruptedException { 101 //System.out.println("next keyvalue : " + path.toString() + " :" + done); 102 if (start != 0 || done) { 103 return false; 104 } else { 105 done = true; 106 return true; 107 } 108 } 109 110 @Override 111 public void close() throws IOException { 112 done = true; 113 } 114} // end of FileRecordReader