001/* A stub actor that writes data into workflows for the CoGroup pattern. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-04-12 00:09:39 +0000 (Sat, 12 Apr 2014) $' 008 * '$Revision: 32657 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032import java.util.concurrent.LinkedBlockingQueue; 033 034import org.kepler.ddp.actor.pattern.Types; 035 036import ptolemy.actor.TypedIOPort; 037import ptolemy.data.Token; 038import ptolemy.data.type.ArrayType; 039import ptolemy.data.type.BaseType; 040import ptolemy.data.type.MonotonicFunction; 041import ptolemy.data.type.RecordType; 042import ptolemy.data.type.Type; 043import ptolemy.graph.InequalityTerm; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.Workspace; 048 049/** A stub actor that writes data into workflows for the CoGroup pattern. 050 * 051 * @author Daniel Crawl 052 * @version $Id: CoGroupInput.java 32657 2014-04-12 00:09:39Z crawl $ 053 */ 054public class CoGroupInput extends StubSourceActor { 055 056 /** Construct a new CoGroupInput in a container with a given name. */ 057 public CoGroupInput(CompositeEntity container, String name) 058 throws IllegalActionException, NameDuplicationException { 059 060 super(container, name); 061 062 in2 = new TypedIOPort(this, "in2", true, false); 063 in2.setTypeAtMost(Types.keyValueArrayType); 064 065 values1 = new TypedIOPort(this, "values1", false, true); 066 values1.setMultiport(true); 067 068 values2 = new TypedIOPort(this, "values2", false, true); 069 values2.setMultiport(true); 070 071 } 072 073 /** Clone the actor into the specified workspace. */ 074 @Override 075 public Object clone(Workspace workspace) throws CloneNotSupportedException { 076 CoGroupInput newObject = (CoGroupInput)super.clone(workspace); 077 newObject._values1List = new LinkedBlockingQueue<Token>(); 078 newObject._values2List = new LinkedBlockingQueue<Token>(); 079 return newObject; 080 } 081 082 /** Write the key and values from both inputs to the workflow. */ 083 @Override 084 public void fire() throws IllegalActionException { 085 086 Token keyToken; 087 Token values1Token; 088 Token values2Token; 089 090 try { 091 keyToken = _keyList.take(); 092 values1Token = _values1List.take(); 093 values2Token = _values2List.take(); 094 } catch(InterruptedException e) { 095 throw new IllegalActionException(this, e, "Error waiting for token lists."); 096 } 097 098 if(!_finish.get()) { 099 key.broadcast(keyToken); 100 values1.broadcast(values1Token); 101 values2.broadcast(values2Token); 102 } 103 104 } 105 106 /** Stop execution of the workflow the next time postfire() is called. */ 107 @Override 108 public void finish() throws InterruptedException { 109 super.finish(); 110 // add a token to unblock the take() in fire() 111 _values1List.put(Token.NIL); 112 _values2List.put(Token.NIL); 113 } 114 115 /** Set the key and values from both inputs. */ 116 public void setInput(Token keyToken, Token values1Token, Token values2Token) throws IllegalActionException { 117 118 try { 119 _keyList.put(keyToken); 120 _values1List.put(values1Token); 121 _values2List.put(values2Token); 122 } catch(InterruptedException e) { 123 throw new IllegalActionException(this, e, "Error waiting for token lists."); 124 } 125 } 126 127 /** The input for key-value pairs. */ 128 public TypedIOPort in2; 129 130 /** Port to write the values from the first input. */ 131 public TypedIOPort values1; 132 133 /** Port to write the values from the second input. */ 134 public TypedIOPort values2; 135 136 /////////////////////////////////////////////////////////////////// 137 //// protected methods //// 138 139 /** Create a function used for setting the type for an output port. */ 140 @Override 141 protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) { 142 return new CoGroupInputPortFunction(outputPort); 143 } 144 145 /////////////////////////////////////////////////////////////////// 146 //// inner classes //// 147 148 /** A MonotonicFunction for setting the output port types. */ 149 private class CoGroupInputPortFunction extends MonotonicFunction { 150 private CoGroupInputPortFunction(TypedIOPort outputPort) { 151 152 _outputPort = outputPort; 153 154 if(outputPort == key || outputPort == values1) { 155 _inputPort = in; 156 } else { 157 _inputPort = in2; 158 } 159 } 160 161 /////////////////////////////////////////////////////////////// 162 //// public inner methods //// 163 164 /** Return the function result. 165 * @return A Type. 166 */ 167 @Override 168 public Object getValue() throws IllegalActionException { 169 170 Type retval = null; 171 final Type inPortType = _inputPort.getType(); 172 173 if (inPortType == BaseType.UNKNOWN) { 174 retval = BaseType.UNKNOWN; 175 } else if (inPortType instanceof ArrayType) { 176 177 final Type elementType = ((ArrayType)inPortType).getElementType(); 178 179 if(elementType instanceof RecordType) { 180 Type fieldType; 181 if(_outputPort == key) { 182 fieldType = ((RecordType)elementType).get("key"); 183 } else { 184 fieldType = ((RecordType)elementType).get("value"); 185 } 186 187 if (fieldType == null) { 188 retval = BaseType.UNKNOWN; 189 } else if(_outputPort == key) { 190 retval = fieldType; 191 } else { 192 retval = new ArrayType(fieldType); 193 } 194 } 195 } 196 197 if(retval == null) { 198 throw new IllegalActionException(CoGroupInput.this, 199 "Invalid type for input port " + _inputPort.getName() + " : " + inPortType); 200 } 201 202 return retval; 203 } 204 205 /** Return an additional string describing the current value 206 * of this function. 207 */ 208 @Override 209 public String getVerboseString() { 210 if (_inputPort.getType() instanceof ArrayType) { 211 if(((ArrayType)_inputPort.getType()).getElementType() instanceof RecordType) { 212 RecordType type = (RecordType) ((ArrayType)_inputPort.getType()).getElementType(); 213 Type fieldType = type.get("value"); 214 215 if (fieldType == null) { 216 return "Input Record in port " + _inputPort.getName() + 217 " doesn't have field named \"value\"."; 218 } 219 } 220 } 221 222 return null; 223 } 224 225 /** Return the type variable in this inequality term. If the 226 * type of the input port is not declared, return an one 227 * element array containing the inequality term representing 228 * the type of the port; otherwise, return an empty array. 229 * @return An array of InequalityTerm. 230 */ 231 @Override 232 public InequalityTerm[] getVariables() { 233 InequalityTerm portTerm = _inputPort.getTypeTerm(); 234 235 if (portTerm.isSettable()) { 236 InequalityTerm[] variable = new InequalityTerm[1]; 237 variable[0] = portTerm; 238 return variable; 239 } 240 241 return (new InequalityTerm[0]); 242 } 243 244 /////////////////////////////////////////////////////////////// 245 //// private inner variable //// 246 247 private TypedIOPort _inputPort; 248 private TypedIOPort _outputPort; 249 250 } 251 252 /** Values from the first input.*/ 253 private LinkedBlockingQueue<Token> _values1List = new LinkedBlockingQueue<Token>(); 254 255 /** Values from the second input. */ 256 private LinkedBlockingQueue<Token> _values2List = new LinkedBlockingQueue<Token>(); 257 258}