001/* A stub actor that writes data into workflows for the Reduce pattern. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-05-19 20:43:29 +0000 (Mon, 19 May 2014) $' 008 * '$Revision: 32724 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032 033import java.util.concurrent.LinkedBlockingQueue; 034 035import ptolemy.actor.TypedIOPort; 036import ptolemy.data.Token; 037import ptolemy.data.type.ArrayType; 038import ptolemy.data.type.BaseType; 039import ptolemy.data.type.MonotonicFunction; 040import ptolemy.data.type.RecordType; 041import ptolemy.data.type.Type; 042import ptolemy.graph.InequalityTerm; 043import ptolemy.kernel.CompositeEntity; 044import ptolemy.kernel.util.IllegalActionException; 045import ptolemy.kernel.util.NameDuplicationException; 046import ptolemy.kernel.util.Workspace; 047 048/** A stub actor that writes data into workflows for the Reduce pattern. 049 * 050 * @author Daniel Crawl 051 * @version $Id: ReduceInput.java 32724 2014-05-19 20:43:29Z crawl $ 052 */ 053public class ReduceInput extends StubSourceActor { 054 055 /** Construct a new ReduceInput in a container with a given name. */ 056 public ReduceInput(CompositeEntity container, String name) 057 throws IllegalActionException, NameDuplicationException { 058 059 super(container, name); 060 061 values = new TypedIOPort(this, "values", false, true); 062 values.setMultiport(true); 063 } 064 065 /** Clone the actor into the specified workspace. */ 066 @Override 067 public Object clone(Workspace workspace) throws CloneNotSupportedException { 068 ReduceInput newObject = (ReduceInput)super.clone(workspace); 069 newObject._valuesList = new LinkedBlockingQueue<Token>(); 070 return newObject; 071 } 072 073 /** Stop execution of the workflow the next time postfire() is called. */ 074 @Override 075 public void finish() throws InterruptedException { 076 super.finish(); 077 // add a token to unblock the take() in fire() 078 _valuesList.put(Token.NIL); 079 } 080 081 /** Write the key and values from the input to the workflow. */ 082 @Override 083 public void fire() throws IllegalActionException { 084 085 Token keyToken = null; 086 Token valuesToken = null; 087 088 try { 089 keyToken = _keyList.take(); 090 valuesToken = _valuesList.take(); 091 } catch(InterruptedException e) { 092 throw new IllegalActionException(this, e, "Error waiting for token."); 093 } 094 095 if(!_finish.get()) { 096 key.broadcast(keyToken); 097 values.broadcast(valuesToken); 098 } 099 100 } 101 102 /** Set the input and values. */ 103 public void setInput(Token keyToken, Token valuesToken) throws IllegalActionException { 104 try { 105 _keyList.put(keyToken); 106 _valuesList.put(valuesToken); 107 } catch(InterruptedException e) { 108 // do not rethrow since this exception can occur when stopping a DDP 109 // job. instead just print to stderr. 110 //throw new IllegalActionException(this, e, "Error waiting for token lists."); 111 System.err.println("Got InterruptedException."); 112 return; 113 } 114 } 115 116 /** Port to write the values to the workflow. */ 117 public TypedIOPort values; 118 119 120 /////////////////////////////////////////////////////////////////// 121 //// protected methods //// 122 123 /** Create a function used for setting the type for an output port. */ 124 @Override 125 protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) { 126 return new ReduceInputPortFunction(outputPort); 127 } 128 129 /////////////////////////////////////////////////////////////////// 130 //// inner classes //// 131 132 /** A MonotonicFunction for setting the output port types. */ 133 private class ReduceInputPortFunction extends MonotonicFunction { 134 private ReduceInputPortFunction(TypedIOPort port) { 135 _port = port; 136 } 137 138 /////////////////////////////////////////////////////////////// 139 //// public inner methods //// 140 141 /** Return the function result. 142 * @return A Type. 143 */ 144 @Override 145 public Object getValue() throws IllegalActionException { 146 147 Type retval = null; 148 final Type inPortType = in.getType(); 149 150 if (inPortType == BaseType.UNKNOWN || inPortType == BaseType.ARRAY_BOTTOM) { 151 retval = BaseType.UNKNOWN; 152 } else if(inPortType instanceof ArrayType) { 153 154 final Type elementType = ((ArrayType)inPortType).getElementType(); 155 156 if (elementType == BaseType.UNKNOWN) { 157 retval = BaseType.UNKNOWN; 158 } else if (elementType instanceof RecordType) { 159 160 Type fieldType; 161 if(_port == values) { 162 fieldType = ((RecordType)elementType).get("value"); 163 } else { 164 fieldType = ((RecordType)elementType).get(_port.getName()); 165 } 166 167 if (fieldType == null) { 168 retval = BaseType.UNKNOWN; 169 } else if (_port == values) { 170 retval = new ArrayType(fieldType); 171 } else { 172 retval = fieldType; 173 } 174 } 175 } 176 177 if(retval == null) { 178 throw new IllegalActionException(ReduceInput.this, 179 "Invalid type for input port: " + inPortType); 180 } 181 182 return retval; 183 184 } 185 186 /** Return an additional string describing the current value 187 * of this function. 188 */ 189 @Override 190 public String getVerboseString() { 191 if (in.getType() instanceof ArrayType) { 192 if(((ArrayType)in.getType()).getElementType() instanceof RecordType) { 193 RecordType type = (RecordType) ((ArrayType)in.getType()).getElementType(); 194 if(_port == values) { 195 if (type.get("value") == null) { 196 return "Input Record doesn't have field named value"; 197 } 198 } else if (type.get(_port.getName()) == null) { 199 return "Input Record doesn't have field named value"; 200 } 201 } 202 } 203 204 return null; 205 } 206 207 /** Return the type variable in this inequality term. If the 208 * type of the input port is not declared, return an one 209 * element array containing the inequality term representing 210 * the type of the port; otherwise, return an empty array. 211 * @return An array of InequalityTerm. 212 */ 213 @Override 214 public InequalityTerm[] getVariables() { 215 InequalityTerm portTerm = in.getTypeTerm(); 216 217 if (portTerm.isSettable()) { 218 InequalityTerm[] variable = new InequalityTerm[1]; 219 variable[0] = portTerm; 220 return variable; 221 } 222 223 return (new InequalityTerm[0]); 224 } 225 226 /////////////////////////////////////////////////////////////// 227 //// private inner variable //// 228 private TypedIOPort _port; 229 } 230 231 /** A list containing tokens to be written by the values port. */ 232 private LinkedBlockingQueue<Token> _valuesList = new LinkedBlockingQueue<Token>(); 233 234}