001/* A stub actor that writes data into workflows for the Map pattern. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-05-19 20:43:29 +0000 (Mon, 19 May 2014) $' 008 * '$Revision: 32724 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032 033import java.util.concurrent.LinkedBlockingQueue; 034 035import ptolemy.actor.TypedIOPort; 036import ptolemy.data.Token; 037import ptolemy.data.type.ArrayType; 038import ptolemy.data.type.BaseType; 039import ptolemy.data.type.MonotonicFunction; 040import ptolemy.data.type.RecordType; 041import ptolemy.data.type.Type; 042import ptolemy.graph.InequalityTerm; 043import ptolemy.kernel.CompositeEntity; 044import ptolemy.kernel.util.IllegalActionException; 045import ptolemy.kernel.util.NameDuplicationException; 046import ptolemy.kernel.util.Workspace; 047 048/** A stub actor that writes data into workflows for the Map pattern. 049 * 050 * @author Daniel Crawl 051 * @version $Id: MapInput.java 32724 2014-05-19 20:43:29Z crawl $ 052 */ 053public class MapInput extends StubSourceActor { 054 055 /** Construct a new MapInput in a container with a given name. */ 056 public MapInput(CompositeEntity container, String name) 057 throws IllegalActionException, NameDuplicationException { 058 059 super(container, name); 060 061 value = new TypedIOPort(this, "value", false, true); 062 value.setMultiport(true); 063 } 064 065 /** Clone the actor into the specified workspace. */ 066 @Override 067 public Object clone(Workspace workspace) throws CloneNotSupportedException { 068 MapInput newObject = (MapInput)super.clone(workspace); 069 newObject._valueList = new LinkedBlockingQueue<Token>(); 070 return newObject; 071 } 072 073 /** Write key and value to the workflow. */ 074 @Override 075 public void fire() throws IllegalActionException { 076 077 super.fire(); 078 079 Token keyToken = null; 080 Token valueToken = null; 081 082 try { 083 keyToken = _keyList.take(); 084 valueToken = _valueList.take(); 085 } catch(InterruptedException e) { 086 throw new IllegalActionException(this, e, "Error waiting for token."); 087 } 088 089 if(!_finish.get()) { 090 key.broadcast(keyToken); 091 value.broadcast(valueToken); 092 } 093 } 094 095 /** Stop execution of the workflow the next time postfire() is called. */ 096 @Override 097 public void finish() throws InterruptedException { 098 super.finish(); 099 // add a token to unblock the take() in fire() 100 _valueList.put(Token.NIL); 101 } 102 103 /** Set the key and value to write to workflow. */ 104 public void setInput(Token keyToken, Token valueToken) throws IllegalActionException { 105 106 try { 107 if(keyToken == null) { 108 keyToken = Token.NIL; 109 } 110 _keyList.put(keyToken); 111 112 if(valueToken == null) { 113 valueToken = Token.NIL; 114 } 115 _valueList.put(valueToken); 116 117 } catch(InterruptedException e) { 118 // do not rethrow since this exception can occur when stopping a DDP 119 // job. instead just print to stderr. 120 //throw new IllegalActionException(this, e, "Error waiting for token lists."); 121 System.err.println("Got InterruptedException."); 122 return; 123 } 124 125 } 126 127 /** Output port to write the value to the workflow. */ 128 public TypedIOPort value; 129 130 /////////////////////////////////////////////////////////////////// 131 //// protected methods //// 132 133 /** Create a function used for setting the type for an output port. */ 134 @Override 135 protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) { 136 return new MapInputPortFunction(outputPort.getName()); 137 } 138 139 140 /////////////////////////////////////////////////////////////////// 141 //// inner classes //// 142 143 /** A MonotonicFunction for setting the output port types. */ 144 private class MapInputPortFunction extends MonotonicFunction { 145 private MapInputPortFunction(String name) { 146 _name = name; 147 } 148 149 /////////////////////////////////////////////////////////////// 150 //// public inner methods //// 151 152 /** Return the function result. 153 * @return A Type. 154 */ 155 @Override 156 public Object getValue() throws IllegalActionException { 157 158 final Type inPortType = in.getType(); 159 Type retval = null; 160 161 if (inPortType == BaseType.UNKNOWN) { 162 retval = BaseType.UNKNOWN; 163 } else if (inPortType instanceof ArrayType) { 164 final Type elementType = ((ArrayType)inPortType).getElementType(); 165 166 if (elementType instanceof RecordType) { 167 final Type fieldType = ((RecordType)elementType).get(_name); 168 if (fieldType == null) { 169 retval = BaseType.UNKNOWN; 170 } else { 171 retval = fieldType; 172 } 173 } 174 } 175 176 if(retval == null) { 177 throw new IllegalActionException(MapInput.this, 178 "Invalid type for input port: " + inPortType); 179 } 180 return retval; 181 } 182 183 /** Return an additional string describing the current value 184 * of this function. 185 */ 186 @Override 187 public String getVerboseString() { 188 if (in.getType() instanceof ArrayType) { 189 if(((ArrayType)in.getType()).getElementType() instanceof RecordType) { 190 RecordType type = (RecordType) ((ArrayType)in.getType()).getElementType(); 191 Type fieldType = type.get(_name); 192 193 if (fieldType == null) { 194 return "Input Record doesn't have field named " + _name; 195 } 196 } 197 } 198 199 return null; 200 } 201 202 /** Return the type variable in this inequality term. If the 203 * type of the input port is not declared, return an one 204 * element array containing the inequality term representing 205 * the type of the port; otherwise, return an empty array. 206 * @return An array of InequalityTerm. 207 */ 208 @Override 209 public InequalityTerm[] getVariables() { 210 InequalityTerm portTerm = in.getTypeTerm(); 211 212 if (portTerm.isSettable()) { 213 InequalityTerm[] variable = new InequalityTerm[1]; 214 variable[0] = portTerm; 215 return variable; 216 } 217 218 return (new InequalityTerm[0]); 219 } 220 221 /////////////////////////////////////////////////////////////// 222 //// private inner variable //// 223 private String _name; 224 } 225 226 227 /** A list containing tokens to be written by the value port. */ 228 private LinkedBlockingQueue<Token> _valueList = new LinkedBlockingQueue<Token>(); 229}