001/*  
002 * Copyright (c) 2012-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-09-04 16:53:05 +0000 (Fri, 04 Sep 2015) $' 
007 * '$Revision: 33862 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.hadoop.io.output;
031
032import java.io.DataOutputStream;
033import java.io.IOException;
034import java.io.UnsupportedEncodingException;
035
036import org.apache.hadoop.io.NullWritable;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.mapreduce.RecordWriter;
039import org.apache.hadoop.mapreduce.TaskAttemptContext;
040
041
042//////////////////////////////////////////////////////////////////////////
043////ValueOnlyOutputWriter
044
045/**
046* This class provides RecordWriter to write key values. 
047* It only write values to output.
048* 
049* @author Jianwu Wang (jianwu@sdsc.edu)
050* @version $Id: ValueOnlyOutputWriter.java 33862 2015-09-04 16:53:05Z crawl $
051*/
052
053public class ValueOnlyOutputWriter<K, V> extends RecordWriter<K, V> {
054
055    private static final String utf8 = "UTF-8";
056    private static final byte[] newline;
057    static {
058      try {
059        newline = "\n".getBytes(utf8);
060      } catch (UnsupportedEncodingException uee) {
061        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
062      }
063    }
064
065    protected DataOutputStream out;
066    private final byte[] keyValueSeparator;
067
068    public ValueOnlyOutputWriter(DataOutputStream out, String keyValueSeparator) {
069      this.out = out;
070      try {
071        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
072      } catch (UnsupportedEncodingException uee) {
073        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
074      }
075    }
076
077    public ValueOnlyOutputWriter(DataOutputStream out) {
078      this(out, "\t");
079    }
080
081    /**
082     * Write the object to the byte stream, handling Text as a special
083     * case.
084     * @param o the object to print
085     * @throws IOException if the write throws, we pass it on
086     */
087    private void writeObject(Object o) throws IOException {
088      if (o instanceof Text) {
089        Text to = (Text) o;
090        out.write(to.getBytes(), 0, to.getLength());
091      } else {
092        out.write(o.toString().getBytes(utf8));
093      }
094    }
095
096    @Override
097    public synchronized void write(K key, V value)
098      throws IOException {
099
100      boolean nullKey = key == null || key instanceof NullWritable;
101      boolean nullValue = value == null || value instanceof NullWritable;
102      if (nullKey && nullValue) {
103        return;
104      }
105//      if (!nullKey) {
106//        writeObject(key);
107//      }
108//      if (!(nullKey || nullValue)) {
109//        out.write(keyValueSeparator);
110//      }
111      if (!nullValue) {
112        writeObject(value);
113      }
114      out.write(newline);
115    }
116
117    @Override
118    public synchronized 
119    void close(TaskAttemptContext context) throws IOException {
120      out.close();
121    }
122
123}