001/* A base class for DDP pattern actor stubs that read data from the workflow. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-07-15 00:24:21 +0000 (Wed, 15 Jul 2015) $' 008 * '$Revision: 33544 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032import java.util.LinkedList; 033import java.util.List; 034import java.util.concurrent.LinkedBlockingQueue; 035 036import org.kepler.ddp.actor.pattern.Types; 037 038import ptolemy.actor.TypedIOPort; 039import ptolemy.data.ArrayToken; 040import ptolemy.data.RecordToken; 041import ptolemy.data.StringToken; 042import ptolemy.data.Token; 043import ptolemy.data.expr.Parameter; 044import ptolemy.data.expr.SingletonParameter; 045import ptolemy.data.type.BaseType; 046import ptolemy.kernel.CompositeEntity; 047import ptolemy.kernel.util.IllegalActionException; 048import ptolemy.kernel.util.NameDuplicationException; 049import ptolemy.kernel.util.Settable; 050import ptolemy.kernel.util.Workspace; 051 052/** A base class for DDP pattern actor stubs that read data from the workflow. 053 * 054 * @author Daniel Crawl 055 * @version $Id: StubSinkActor.java 33544 2015-07-15 00:24:21Z crawl $ 056 */ 057public class StubSinkActor extends StubBaseActor { 058 059 /** Construct a new StubSinkActor in a container with a given name. */ 060 public StubSinkActor(CompositeEntity container, String name) 061 throws IllegalActionException, NameDuplicationException { 062 063 super(container, name); 064 065 keysvalues = new TypedIOPort(this, "keysvalues", true, false); 066 keysvalues.setTypeAtMost(Types.keyValueArrayType); 067 new SingletonParameter(keysvalues, "_showName"); 068 069 lines = new TypedIOPort(this, "lines", true, false); 070 lines.setTypeEquals(BaseType.STRING); 071 new SingletonParameter(lines, "_showName"); 072 073 out = new TypedIOPort(this, "out", false, true); 074 075 // set the production rate to 0 since no tokens are written. 076 // the out port is connected to the out port of the pattern 077 // actor, which does not transfer tokens. 078 // NOTE: if the production rate is not 0, SDF sometimes throws 079 // an exception since it expects a token to be written when 080 // this actor is fired. 081 Parameter tokenProductionRate = new Parameter(out, "tokenProductionRate"); 082 tokenProductionRate.setVisibility(Settable.NOT_EDITABLE); 083 tokenProductionRate.setTypeEquals(BaseType.INT); 084 tokenProductionRate.setToken("0"); 085 086 _emptyKeyValue = new ArrayToken(new Token[] { new StringToken(), new StringToken() }); 087 088 } 089 090 /** Clone the actor into the specified workspace. */ 091 @Override 092 public Object clone(Workspace workspace) throws CloneNotSupportedException { 093 StubSinkActor newObject = (StubSinkActor)super.clone(workspace); 094 newObject._blockingList = new LinkedBlockingQueue<List<Token>>(); 095 try { 096 newObject._emptyKeyValue = new ArrayToken(new Token[] { new StringToken(), new StringToken() }); 097 } catch (IllegalActionException e) { 098 throw new CloneNotSupportedException("Error creating _emptyKeyValue for clone: " + e.getMessage()); 099 } 100 newObject._nonBlockingList = new LinkedList<Token>(); 101 return newObject; 102 } 103 104 /** Read the data the input port. */ 105 @Override 106 public void fire() throws IllegalActionException { 107 108 109 Token keysValuesToken = null; 110 111 if(_keysvaluesConnected) { 112 keysValuesToken = keysvalues.get(0); 113 } 114 115 if(_linesConnected) { 116 final String linesStr = ((StringToken)lines.get(0)).stringValue(); 117 if(linesStr.trim().isEmpty()) { 118 keysValuesToken = _emptyKeyValue; 119 } else { 120 final String[] lineArray = linesStr.split("\n"); 121 final Token[] tokenArray = new Token[lineArray.length]; 122 for(int i = 0; i < lineArray.length; i++) { 123 final String line = lineArray[i]; 124 tokenArray[i] = new RecordToken(_recordLabels, 125 new Token[] {new StringToken(line.substring(0, line.indexOf("\t"))), 126 new StringToken(line)}); 127 } 128 keysValuesToken = new ArrayToken(tokenArray); 129 } 130 131 } 132 133 if(_runWorkflowLifecyclePerInput) { 134 _nonBlockingList.add(keysValuesToken); 135 } else { 136 List<Token> list = new LinkedList<Token>(); 137 list.add(keysValuesToken); 138 try { 139 _blockingList.put(list); 140 } catch(InterruptedException e) { 141 throw new IllegalActionException(this, e, "Error waiting for token."); 142 } 143 } 144 145 } 146 147 /** Get the data read from the workflow by this actor. */ 148 public List<Token> getOutput() throws IllegalActionException { 149 150 List<Token> retval; 151 152 if(_runWorkflowLifecyclePerInput) { 153 retval = _nonBlockingList; 154 _nonBlockingList = new LinkedList<Token>(); 155 } else { 156 try { 157 retval = _blockingList.take(); 158 } catch(InterruptedException e) { 159 // do not rethrow since this exception can occur when stopping a DDP 160 // job. instead just print to stderr. 161 //throw new IllegalActionException(this, e, "Error waiting for token lists."); 162 System.err.println("Got InterruptedException."); 163 return null; 164 } 165 } 166 167 return retval; 168 } 169 170 @Override 171 public void preinitialize() throws IllegalActionException { 172 173 super.preinitialize(); 174 175 // see if keysvalues port is connected 176 if(keysvalues.numberOfSources() > 0) { 177 _keysvaluesConnected = true; 178 179 // set the out port type to be based on keysvalues 180 out.setTypeAtLeast(keysvalues); 181 182 } else { 183 _keysvaluesConnected = false; 184 } 185 186 // see if lines port is connected 187 if(lines.numberOfSources() > 0) { 188 _linesConnected = true; 189 190 // set the out port type to be strings 191 out.setTypeEquals(Types.createKeyValueArrayType(BaseType.STRING, BaseType.STRING)); 192 193 // FIXME fix for problem in hadoop 194 if(!_keysvaluesConnected) { 195 keysvalues.setTypeEquals(Types.createKeyValueArrayType(BaseType.STRING, BaseType.STRING)); 196 } 197 198 } else { 199 _linesConnected = false; 200 } 201 202 if(!_keysvaluesConnected && !_linesConnected) { 203 throw new IllegalActionException(this, "No input connected."); 204 } 205 206 } 207 208 /** An array of records with key-values. */ 209 public TypedIOPort keysvalues; 210 211 /** Line(s) of text that will be converted into key-value pairs. 212 * For each line, the key is the part before the first tab, 213 * and the value is the entire line. 214 */ 215 public TypedIOPort lines; 216 217 /** Output port of key-value pairs. */ 218 public TypedIOPort out; 219 220 /** A blocking list of tokens used when executing one iteration of the workflow per input. */ 221 private LinkedBlockingQueue<List<Token>> _blockingList = new LinkedBlockingQueue<List<Token>>(); 222 223 /** A non-blocking list of tokens used when execution the full lifecycle 224 * of the workflow per input. 225 */ 226 private List<Token> _nonBlockingList = new LinkedList<Token>(); 227 228 /** True if the keysvalues port is connected. */ 229 private boolean _keysvaluesConnected; 230 231 /** True if the lines port is connected. */ 232 private boolean _linesConnected; 233 234 // TODO move to Types, find other instances 235 /** Labels for key-value record token. */ 236 private static final String[] _recordLabels = new String[] {"key", "value"}; 237 238 /** Array of one record token with an empty key value. Cannot make static final 239 * since ArrayToken constructor throws exception. 240 */ 241 private ArrayToken _emptyKeyValue; 242 243}