001/* Hadoop RecordWriter for Ptolemy tokens.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-07-02 15:58:19 +0000 (Wed, 02 Jul 2014) $' 
008 * '$Revision: 32804 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.hadoop.io.output;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.List;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.io.Writable;
038import org.apache.hadoop.mapreduce.RecordWriter;
039import org.apache.hadoop.mapreduce.TaskAttemptContext;
040import org.kepler.ddp.Utilities;
041import org.kepler.ddp.actor.pattern.DDPDataSink;
042import org.kepler.hadoop.util.StubUtilities;
043
044import ptolemy.data.RecordToken;
045import ptolemy.data.Token;
046import ptolemy.kernel.util.IllegalActionException;
047
048/** Hadoop RecordWriter for Ptolemy tokens.
049 * 
050 *  @author Daniel Crawl
051 *  @verion $Id: TokenRecordWriter.java 32804 2014-07-02 15:58:19Z crawl $
052 */
053public class TokenRecordWriter extends  RecordWriter<Writable, Writable> {
054
055    /** Create a new TokenRecordWriter with a set of parameters. */
056    public TokenRecordWriter(Configuration parameters) {
057        
058        // TODO: possible to use this when kepler is in a separate JVM?
059        if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
060            throw new RuntimeException("TokenOutputFormat only works when running Kepler in the same JVM.");
061        }
062        
063        sinkActorName = parameters.get(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME, null);
064        if(sinkActorName == null) {
065            throw new RuntimeException("Name of DDPDataSink actor not in configuration.");
066        }
067        
068        tokenList = new ArrayList<Token>();
069    }
070
071    /** Write a key and value. */
072    @Override
073    public void write(Writable key, Writable value)
074            throws IOException, InterruptedException {
075
076        Token recordToken;
077        
078        try {
079            // see if there's only one field
080            recordToken = new RecordToken(new String[] {"key", "value"},
081                    new Token[] {StubUtilities.convertToToken(key),
082                    StubUtilities.convertToToken(value)});
083        } catch(IllegalActionException e) {
084            throw new RuntimeException("Error creating RecordToken.", e);
085        }
086        
087        tokenList.add(recordToken);
088
089    }
090
091    /** Close the record writer. */
092    @Override
093    public void close(TaskAttemptContext context) throws IOException,
094            InterruptedException {
095        try {
096            DDPDataSink.addTokens(sinkActorName, tokenList);
097        } catch (IllegalActionException e) {
098            throw new IOException("Error writing token.", e);
099        }
100        tokenList = null;
101    }
102
103    /** A list of tokens accumulated from write(). */
104    private List<Token> tokenList;
105    
106    /** The full name of the DDPDataSink actor to write the list of tokens. */
107    private String sinkActorName;
108
109}