001/* A stub actor that writes data into workflows for the Match pattern. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-04-12 00:09:39 +0000 (Sat, 12 Apr 2014) $' 008 * '$Revision: 32657 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032import java.util.concurrent.LinkedBlockingQueue; 033 034import org.kepler.ddp.actor.pattern.Types; 035 036import ptolemy.actor.TypedIOPort; 037import ptolemy.data.Token; 038import ptolemy.data.type.ArrayType; 039import ptolemy.data.type.BaseType; 040import ptolemy.data.type.MonotonicFunction; 041import ptolemy.data.type.RecordType; 042import ptolemy.data.type.Type; 043import ptolemy.graph.InequalityTerm; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.Workspace; 048 049/** A stub actor that writes data into workflows for the Match pattern. 050 * 051 * @author Daniel Crawl 052 * @version $Id: MatchInput.java 32657 2014-04-12 00:09:39Z crawl $ 053 */ 054public class MatchInput extends StubSourceActor { 055 056 /** Construct a new MatchInput in a container with a given name. */ 057 public MatchInput(CompositeEntity container, String name) 058 throws IllegalActionException, NameDuplicationException { 059 060 super(container, name); 061 062 in2 = new TypedIOPort(this, "in2", true, false); 063 in2.setTypeAtMost(Types.keyValueArrayType); 064 065 value1 = new TypedIOPort(this, "value1", false, true); 066 value1.setMultiport(true); 067 068 value2 = new TypedIOPort(this, "value2", false, true); 069 value2.setMultiport(true); 070 071 } 072 073 /** Clone the actor into the specified workspace. */ 074 @Override 075 public Object clone(Workspace workspace) throws CloneNotSupportedException { 076 MatchInput newObject = (MatchInput)super.clone(workspace); 077 newObject._value1List = new LinkedBlockingQueue<Token>(); 078 newObject._value2List = new LinkedBlockingQueue<Token>(); 079 return newObject; 080 } 081 082 /** Write the key and value from both inputs to the workflow. */ 083 @Override 084 public void fire() throws IllegalActionException { 085 086 Token keyToken; 087 Token value1Token; 088 Token value2Token; 089 try { 090 keyToken = _keyList.take(); 091 value1Token = _value1List.take(); 092 value2Token = _value2List.take(); 093 } catch(InterruptedException e) { 094 throw new IllegalActionException(this, e, "Error waiting for token lists."); 095 } 096 097 if(!_finish.get()) { 098 key.broadcast(keyToken); 099 value1.broadcast(value1Token); 100 value2.broadcast(value2Token); 101 } 102 } 103 104 /** Stop execution of the workflow the next time postfire() is called. */ 105 @Override 106 public void finish() throws InterruptedException { 107 super.finish(); 108 // add a token to unblock the take() in fire() 109 _value1List.put(Token.NIL); 110 _value2List.put(Token.NIL); 111 } 112 113 /** Set the key and value from both inputs. */ 114 public void setInput(Token keyToken, Token value1Token, Token value2Token) throws IllegalActionException { 115 116 try { 117 _keyList.put(keyToken); 118 _value1List.put(value1Token); 119 _value2List.put(value2Token); 120 } catch(InterruptedException e) { 121 throw new IllegalActionException(this, e, "Error waiting for token lists."); 122 } 123 } 124 125 /** The second input port of key-value pairs. */ 126 public TypedIOPort in2; 127 128 /** Port to write the value from the first input. */ 129 public TypedIOPort value1; 130 131 /** Port to write the value from the second input. */ 132 public TypedIOPort value2; 133 134 /////////////////////////////////////////////////////////////////// 135 //// protected methods //// 136 137 /** Create a function used for setting the type for an output port. */ 138 @Override 139 protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) { 140 return new MatchInputPortFunction(outputPort); 141 } 142 143 /////////////////////////////////////////////////////////////////// 144 //// inner classes //// 145 146 /** A MonotonicFunction for setting the output port types. */ 147 private class MatchInputPortFunction extends MonotonicFunction { 148 private MatchInputPortFunction(TypedIOPort outputPort) { 149 150 _outputPort = outputPort; 151 152 if(outputPort == key || outputPort == value1) { 153 _inputPort = in; 154 } else { 155 _inputPort = in2; 156 } 157 } 158 159 /////////////////////////////////////////////////////////////// 160 //// public inner methods //// 161 162 /** Return the function result. 163 * @return A Type. 164 */ 165 @Override 166 public Object getValue() throws IllegalActionException { 167 168 Type retval = null; 169 final Type inPortType = _inputPort.getType(); 170 171 if (inPortType == BaseType.UNKNOWN) { 172 retval = BaseType.UNKNOWN; 173 } else if (inPortType instanceof ArrayType) { 174 175 final Type elementType = ((ArrayType)inPortType).getElementType(); 176 if(elementType instanceof RecordType) { 177 178 Type fieldType; 179 if(_outputPort == key) { 180 fieldType = ((RecordType)elementType).get("key"); 181 } else { 182 fieldType = ((RecordType)elementType).get("value"); 183 } 184 185 if (fieldType == null) { 186 retval = BaseType.UNKNOWN; 187 } else { 188 retval = fieldType; 189 } 190 } 191 } 192 193 if(retval == null) { 194 throw new IllegalActionException(MatchInput.this, 195 "Invalid type for input port " + _inputPort.getName() + " : " + inPortType); 196 } 197 198 return retval; 199 } 200 201 /** Return an additional string describing the current value 202 * of this function. 203 */ 204 @Override 205 public String getVerboseString() { 206 if (_inputPort.getType() instanceof ArrayType) { 207 if(((ArrayType)_inputPort.getType()).getElementType() instanceof RecordType) { 208 RecordType type = (RecordType) ((ArrayType)_inputPort.getType()).getElementType(); 209 Type fieldType = type.get("value"); 210 211 if (fieldType == null) { 212 return "Input Record in port " + _inputPort.getName() + 213 " doesn't have field named \"value\"."; 214 } 215 } 216 } 217 218 return null; 219 } 220 221 /** Return the type variable in this inequality term. If the 222 * type of the input port is not declared, return an one 223 * element array containing the inequality term representing 224 * the type of the port; otherwise, return an empty array. 225 * @return An array of InequalityTerm. 226 */ 227 @Override 228 public InequalityTerm[] getVariables() { 229 InequalityTerm portTerm = _inputPort.getTypeTerm(); 230 231 if (portTerm.isSettable()) { 232 InequalityTerm[] variable = new InequalityTerm[1]; 233 variable[0] = portTerm; 234 return variable; 235 } 236 237 return (new InequalityTerm[0]); 238 } 239 240 /////////////////////////////////////////////////////////////// 241 //// private inner variable //// 242 243 private TypedIOPort _inputPort; 244 private TypedIOPort _outputPort; 245 246 } 247 248 249 /** A list containing tokens to be written by the values1 port. */ 250 private LinkedBlockingQueue<Token> _value1List = new LinkedBlockingQueue<Token>(); 251 252 /** A list containing tokens to be written by the values2 port. */ 253 private LinkedBlockingQueue<Token> _value2List = new LinkedBlockingQueue<Token>(); 254 255}