001/* Hadoop RecordWriter for Ptolemy tokens. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-07-02 15:58:19 +0000 (Wed, 02 Jul 2014) $' 008 * '$Revision: 32804 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.hadoop.io.output; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.List; 035 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.io.Writable; 038import org.apache.hadoop.mapreduce.RecordWriter; 039import org.apache.hadoop.mapreduce.TaskAttemptContext; 040import org.kepler.ddp.Utilities; 041import org.kepler.ddp.actor.pattern.DDPDataSink; 042import org.kepler.hadoop.util.StubUtilities; 043 044import ptolemy.data.RecordToken; 045import ptolemy.data.Token; 046import ptolemy.kernel.util.IllegalActionException; 047 048/** Hadoop RecordWriter for Ptolemy tokens. 049 * 050 * @author Daniel Crawl 051 * @verion $Id: TokenRecordWriter.java 32804 2014-07-02 15:58:19Z crawl $ 052 */ 053public class TokenRecordWriter extends RecordWriter<Writable, Writable> { 054 055 /** Create a new TokenRecordWriter with a set of parameters. */ 056 public TokenRecordWriter(Configuration parameters) { 057 058 // TODO: possible to use this when kepler is in a separate JVM? 059 if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 060 throw new RuntimeException("TokenOutputFormat only works when running Kepler in the same JVM."); 061 } 062 063 sinkActorName = parameters.get(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME, null); 064 if(sinkActorName == null) { 065 throw new RuntimeException("Name of DDPDataSink actor not in configuration."); 066 } 067 068 tokenList = new ArrayList<Token>(); 069 } 070 071 /** Write a key and value. */ 072 @Override 073 public void write(Writable key, Writable value) 074 throws IOException, InterruptedException { 075 076 Token recordToken; 077 078 try { 079 // see if there's only one field 080 recordToken = new RecordToken(new String[] {"key", "value"}, 081 new Token[] {StubUtilities.convertToToken(key), 082 StubUtilities.convertToToken(value)}); 083 } catch(IllegalActionException e) { 084 throw new RuntimeException("Error creating RecordToken.", e); 085 } 086 087 tokenList.add(recordToken); 088 089 } 090 091 /** Close the record writer. */ 092 @Override 093 public void close(TaskAttemptContext context) throws IOException, 094 InterruptedException { 095 try { 096 DDPDataSink.addTokens(sinkActorName, tokenList); 097 } catch (IllegalActionException e) { 098 throw new IOException("Error writing token.", e); 099 } 100 tokenList = null; 101 } 102 103 /** A list of tokens accumulated from write(). */ 104 private List<Token> tokenList; 105 106 /** The full name of the DDPDataSink actor to write the list of tokens. */ 107 private String sinkActorName; 108 109}