001/* A stub actor that writes data into workflows for the Map pattern.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-05-19 20:43:29 +0000 (Mon, 19 May 2014) $' 
008 * '$Revision: 32724 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032
033import java.util.concurrent.LinkedBlockingQueue;
034
035import ptolemy.actor.TypedIOPort;
036import ptolemy.data.Token;
037import ptolemy.data.type.ArrayType;
038import ptolemy.data.type.BaseType;
039import ptolemy.data.type.MonotonicFunction;
040import ptolemy.data.type.RecordType;
041import ptolemy.data.type.Type;
042import ptolemy.graph.InequalityTerm;
043import ptolemy.kernel.CompositeEntity;
044import ptolemy.kernel.util.IllegalActionException;
045import ptolemy.kernel.util.NameDuplicationException;
046import ptolemy.kernel.util.Workspace;
047
048/** A stub actor that writes data into workflows for the Map pattern.
049 * 
050 *  @author Daniel Crawl
051 *  @version $Id: MapInput.java 32724 2014-05-19 20:43:29Z crawl $
052 */
053public class MapInput extends StubSourceActor {
054
055    /** Construct a new MapInput in a container with a given name. */
056    public MapInput(CompositeEntity container, String name)
057            throws IllegalActionException, NameDuplicationException {
058        
059        super(container, name);
060
061        value = new TypedIOPort(this, "value", false, true);
062        value.setMultiport(true);
063    }
064        
065    /** Clone the actor into the specified workspace. */
066    @Override
067    public Object clone(Workspace workspace) throws CloneNotSupportedException {
068        MapInput newObject = (MapInput)super.clone(workspace);
069        newObject._valueList = new LinkedBlockingQueue<Token>();
070        return newObject;
071    }
072
073    /** Write key and value to the workflow. */
074    @Override
075    public void fire() throws IllegalActionException {
076
077        super.fire();
078                
079        Token keyToken = null;
080        Token valueToken = null;
081        
082        try {
083            keyToken = _keyList.take();
084            valueToken = _valueList.take();
085        } catch(InterruptedException e) {
086            throw new IllegalActionException(this, e, "Error waiting for token.");
087        }                    
088        
089        if(!_finish.get()) {
090            key.broadcast(keyToken);            
091            value.broadcast(valueToken);
092        }
093    }    
094        
095    /** Stop execution of the workflow the next time postfire() is called. */
096    @Override
097    public void finish() throws InterruptedException {
098        super.finish();
099        // add a token to unblock the take() in fire()
100        _valueList.put(Token.NIL);
101    }
102    
103    /** Set the key and value to write to workflow. */
104    public void setInput(Token keyToken, Token valueToken) throws IllegalActionException {
105       
106        try {
107            if(keyToken == null) {
108                keyToken = Token.NIL;
109            }
110            _keyList.put(keyToken);
111            
112            if(valueToken == null) {
113                valueToken = Token.NIL;
114            }
115            _valueList.put(valueToken);
116
117        } catch(InterruptedException e) {
118            // do not rethrow since this exception can occur when stopping a DDP
119            // job. instead just print to stderr.
120            //throw new IllegalActionException(this, e, "Error waiting for token lists.");
121            System.err.println("Got InterruptedException.");
122            return;
123        }
124
125    }
126                
127    /** Output port to write the value to the workflow. */
128    public TypedIOPort value;
129
130    ///////////////////////////////////////////////////////////////////
131    ////                         protected methods                 ////
132
133    /** Create a function used for setting the type for an output port. */
134    @Override
135    protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) {
136        return new MapInputPortFunction(outputPort.getName());
137    }
138        
139
140    ///////////////////////////////////////////////////////////////////
141    ////                         inner classes                     ////
142
143    /** A MonotonicFunction for setting the output port types. */
144    private class MapInputPortFunction extends MonotonicFunction {
145        private MapInputPortFunction(String name) {
146            _name = name;
147        }
148
149        ///////////////////////////////////////////////////////////////
150        ////                       public inner methods            ////
151
152        /** Return the function result.
153         *  @return A Type.
154         */
155        @Override
156        public Object getValue() throws IllegalActionException {
157            
158            final Type inPortType = in.getType();
159            Type retval = null;
160            
161            if (inPortType == BaseType.UNKNOWN) {
162                retval = BaseType.UNKNOWN;
163            } else if (inPortType instanceof ArrayType) {
164                final Type elementType = ((ArrayType)inPortType).getElementType();
165                
166                if (elementType instanceof RecordType) {
167                    final Type fieldType = ((RecordType)elementType).get(_name);
168                    if (fieldType == null) {
169                        retval = BaseType.UNKNOWN;
170                    } else {
171                        retval = fieldType;
172                    }
173                }
174            }
175            
176            if(retval == null) {
177                throw new IllegalActionException(MapInput.this,
178                        "Invalid type for input port: " + inPortType);
179            }
180            return retval;
181        }
182
183        /** Return an additional string describing the current value
184         *  of this function.
185         */
186        @Override
187        public String getVerboseString() {
188            if (in.getType() instanceof ArrayType) {
189                if(((ArrayType)in.getType()).getElementType() instanceof RecordType) {
190                    RecordType type = (RecordType) ((ArrayType)in.getType()).getElementType();
191                    Type fieldType = type.get(_name);
192
193                    if (fieldType == null) {
194                        return "Input Record doesn't have field named " + _name;
195                    }
196                }
197            }
198
199            return null;
200        }
201
202        /** Return the type variable in this inequality term. If the
203         *  type of the input port is not declared, return an one
204         *  element array containing the inequality term representing
205         *  the type of the port; otherwise, return an empty array.
206         *  @return An array of InequalityTerm.
207         */
208        @Override
209        public InequalityTerm[] getVariables() {
210            InequalityTerm portTerm = in.getTypeTerm();
211
212            if (portTerm.isSettable()) {
213                InequalityTerm[] variable = new InequalityTerm[1];
214                variable[0] = portTerm;
215                return variable;
216            }
217
218            return (new InequalityTerm[0]);
219        }
220
221        ///////////////////////////////////////////////////////////////
222        ////                       private inner variable          ////
223        private String _name;
224    }
225
226    
227    /** A list containing tokens to be written by the value port. */
228    private LinkedBlockingQueue<Token> _valueList = new LinkedBlockingQueue<Token>();
229}