001/* A stub actor that writes data into workflows for the Cross pattern. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-04-13 21:25:55 +0000 (Sun, 13 Apr 2014) $' 008 * '$Revision: 32663 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032import java.util.concurrent.LinkedBlockingQueue; 033 034import org.kepler.ddp.actor.pattern.Types; 035 036import ptolemy.actor.TypedIOPort; 037import ptolemy.data.Token; 038import ptolemy.data.type.ArrayType; 039import ptolemy.data.type.BaseType; 040import ptolemy.data.type.MonotonicFunction; 041import ptolemy.data.type.RecordType; 042import ptolemy.data.type.Type; 043import ptolemy.graph.InequalityTerm; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.Workspace; 048 049/** A stub actor that writes data into workflows for the Cross pattern. 050 * 051 * @author Daniel Crawl 052 * @version $Id: CrossInput.java 32663 2014-04-13 21:25:55Z crawl $ 053 */ 054public class CrossInput extends MapInput { 055 056 /** Construct a new CrossInput in a container with a given name. */ 057 public CrossInput(CompositeEntity container, String name) 058 throws IllegalActionException, NameDuplicationException { 059 060 super(container, name); 061 062 in2 = new TypedIOPort(this, "in2", true, false); 063 in2.setTypeAtMost(Types.keyValueArrayType); 064 065 key2 = new TypedIOPort(this, "key2", false, true); 066 key2.setMultiport(true); 067 068 value2 = new TypedIOPort(this, "value2", false, true); 069 value2.setMultiport(true); 070 } 071 072 /** Clone the actor into the specified workspace. */ 073 @Override 074 public Object clone(Workspace workspace) throws CloneNotSupportedException { 075 CrossInput newObject = (CrossInput) super.clone(workspace); 076 newObject._key2List = new LinkedBlockingQueue<Token>(); 077 newObject._value2List = new LinkedBlockingQueue<Token>(); 078 return newObject; 079 } 080 081 /** Write the key and value from both inputs to the workflow. */ 082 @Override 083 public void fire() throws IllegalActionException { 084 085 super.fire(); 086 087 Token key2Token; 088 Token value2Token; 089 090 try { 091 key2Token = _key2List.take(); 092 value2Token = _value2List.take(); 093 } catch(InterruptedException e) { 094 throw new IllegalActionException(this, e, "Error waiting for token."); 095 } 096 097 if(!_finish.get()) { 098 key2.broadcast(key2Token); 099 value2.broadcast(value2Token); 100 } 101 } 102 103 /** Stop execution of the workflow the next time postfire() is called. */ 104 @Override 105 public void finish() throws InterruptedException { 106 super.finish(); 107 // add a token to unblock the take() in fire() 108 _key2List.put(Token.NIL); 109 _value2List.put(Token.NIL); 110 } 111 112 /** Set the key and value from both inputs. */ 113 public void setInput(Token key1Token, Token value1Token, Token key2Token, Token value2Token) throws IllegalActionException { 114 115 super.setInput(key1Token, value1Token); 116 try { 117 118 if(key2Token == null) { 119 key2Token = Token.NIL; 120 } 121 _key2List.put(key2Token); 122 123 if(value2Token == null) { 124 value2Token = Token.NIL; 125 } 126 _value2List.put(value2Token); 127 } catch(InterruptedException e) { 128 throw new IllegalActionException(this, e, "Error waiting for token lists."); 129 } 130 } 131 132 /** Second input port for key-value pairs. */ 133 public TypedIOPort in2; 134 135 /** Port to write the key from the second input. */ 136 public TypedIOPort key2; 137 138 /** Port to write the value from the second input. */ 139 public TypedIOPort value2; 140 141 /////////////////////////////////////////////////////////////////// 142 //// protected methods //// 143 144 /** Create a function used for setting the type for an output port. */ 145 @Override 146 protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) { 147 return new CrossInputPortFunction(outputPort); 148 } 149 150 /////////////////////////////////////////////////////////////////// 151 //// inner classes //// 152 153 /** A MonotonicFunction for setting the output port types. */ 154 private class CrossInputPortFunction extends MonotonicFunction { 155 private CrossInputPortFunction(TypedIOPort outputPort) { 156 157 // if output port is key2 or value2, set the name 158 // to be "key" or "value2", respectively, since "key2" 159 // and "value2" are not fields in the key-values pair 160 // record type. 161 if(outputPort == key2) { 162 _outputPortName = "key"; 163 } else if(outputPort == value2) { 164 _outputPortName = "value"; 165 } else { 166 _outputPortName = outputPort.getName(); 167 } 168 169 if(outputPort == key || outputPort == value) { 170 _inputPort = in; 171 } else { 172 _inputPort = in2; 173 } 174 } 175 176 /////////////////////////////////////////////////////////////// 177 //// public inner methods //// 178 179 /** Return the function result. 180 * @return A Type. 181 */ 182 @Override 183 public Object getValue() throws IllegalActionException { 184 185 Type retval = null; 186 final Type inPortType = _inputPort.getType(); 187 188 if (inPortType == BaseType.UNKNOWN) { 189 retval = BaseType.UNKNOWN; 190 } else if(inPortType instanceof ArrayType) { 191 192 final Type elementType = ((ArrayType)inPortType).getElementType(); 193 if (elementType instanceof RecordType) { 194 final Type fieldType = ((RecordType)elementType).get(_outputPortName); 195 196 if (fieldType == null) { 197 retval = BaseType.UNKNOWN; 198 } else { 199 retval = fieldType; 200 } 201 } 202 } 203 204 if(retval == null) { 205 throw new IllegalActionException(CrossInput.this, 206 "Invalid type for input port " + _inputPort.getName() + " : " + inPortType); 207 } 208 209 return retval; 210 } 211 212 /** Return an additional string describing the current value 213 * of this function. 214 */ 215 @Override 216 public String getVerboseString() { 217 if (_inputPort.getType() instanceof ArrayType) { 218 if(((ArrayType)_inputPort.getType()).getElementType() instanceof RecordType) { 219 RecordType type = (RecordType) ((ArrayType)_inputPort.getType()).getElementType(); 220 Type fieldType = type.get(_outputPortName); 221 222 if (fieldType == null) { 223 return "Input Record in port " + _inputPort.getName() + 224 " doesn't have field named " + _outputPortName; 225 } 226 } 227 } 228 229 return null; 230 } 231 232 /** Return the type variable in this inequality term. If the 233 * type of the input port is not declared, return an one 234 * element array containing the inequality term representing 235 * the type of the port; otherwise, return an empty array. 236 * @return An array of InequalityTerm. 237 */ 238 @Override 239 public InequalityTerm[] getVariables() { 240 InequalityTerm portTerm = _inputPort.getTypeTerm(); 241 242 if (portTerm.isSettable()) { 243 InequalityTerm[] variable = new InequalityTerm[1]; 244 variable[0] = portTerm; 245 return variable; 246 } 247 248 return (new InequalityTerm[0]); 249 } 250 251 /////////////////////////////////////////////////////////////// 252 //// private inner variable //// 253 254 private TypedIOPort _inputPort; 255 private String _outputPortName; 256 257 } 258 259 /** A list containing tokens to be written by the key2 port. */ 260 private LinkedBlockingQueue<Token> _key2List = new LinkedBlockingQueue<Token>(); 261 262 /** A list containing tokens to be written by the value2 port. */ 263 private LinkedBlockingQueue<Token> _value2List = new LinkedBlockingQueue<Token>(); 264 265}