001/* 002 * Copyright (c) 2012-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33070 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.hadoop.io.output; 031 032import java.io.DataOutputStream; 033import java.io.IOException; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.io.compress.CompressionCodec; 040import org.apache.hadoop.io.compress.GzipCodec; 041import org.apache.hadoop.mapreduce.RecordWriter; 042import org.apache.hadoop.mapreduce.TaskAttemptContext; 043import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 044import org.apache.hadoop.util.ReflectionUtils; 045 046 047////////////////////////////////////////////////////////////////////////// 048////ValueOnlyOutputFormat 049 050/** 051* This class provides OutputFormat to write key values. 052* It only write values to output. 053* 054* @author Jianwu Wang (jianwu@sdsc.edu) 055* @version $Id: ValueOnlyOutputFormat.java 33070 2014-11-12 23:21:09Z crawl $ 056*/ 057 058public class ValueOnlyOutputFormat<K, V> extends TextOutputFormat<K, V> { 059 060 @Override 061 public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) 062 throws IOException, InterruptedException { 063 Configuration conf = job.getConfiguration(); 064 boolean isCompressed = getCompressOutput(job); 065 String keyValueSeparator = conf.get( 066 "mapred.textoutputformat.separator", "\t"); 067 CompressionCodec codec = null; 068 String extension = ""; 069 if (isCompressed) { 070 Class<? extends CompressionCodec> codecClass = getOutputCompressorClass( 071 job, GzipCodec.class); 072 codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, 073 conf); 074 extension = codec.getDefaultExtension(); 075 } 076 Path file = getDefaultWorkFile(job, extension); 077 FileSystem fs = file.getFileSystem(conf); 078 if (!isCompressed) { 079 FSDataOutputStream fileOut = fs.create(file, false); 080 return new ValueOnlyOutputWriter<K, V>(fileOut, keyValueSeparator); 081 } else { 082 FSDataOutputStream fileOut = fs.create(file, false); 083 return new ValueOnlyOutputWriter<K, V>(new DataOutputStream( 084 codec.createOutputStream(fileOut)), keyValueSeparator); 085 } 086 } 087 088}