001/* A stub actor that writes data into workflows for the Cross pattern.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-04-13 21:25:55 +0000 (Sun, 13 Apr 2014) $' 
008 * '$Revision: 32663 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032import java.util.concurrent.LinkedBlockingQueue;
033
034import org.kepler.ddp.actor.pattern.Types;
035
036import ptolemy.actor.TypedIOPort;
037import ptolemy.data.Token;
038import ptolemy.data.type.ArrayType;
039import ptolemy.data.type.BaseType;
040import ptolemy.data.type.MonotonicFunction;
041import ptolemy.data.type.RecordType;
042import ptolemy.data.type.Type;
043import ptolemy.graph.InequalityTerm;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047import ptolemy.kernel.util.Workspace;
048
049/** A stub actor that writes data into workflows for the Cross pattern.
050 * 
051 *  @author Daniel Crawl
052 *  @version $Id: CrossInput.java 32663 2014-04-13 21:25:55Z crawl $
053 */
054public class CrossInput extends MapInput {
055
056    /** Construct a new CrossInput in a container with a given name. */
057    public CrossInput(CompositeEntity container, String name)
058            throws IllegalActionException, NameDuplicationException {
059        
060        super(container, name);
061        
062        in2 = new TypedIOPort(this, "in2", true, false);
063        in2.setTypeAtMost(Types.keyValueArrayType);        
064            
065        key2 = new TypedIOPort(this, "key2", false, true);
066        key2.setMultiport(true);
067
068        value2 = new TypedIOPort(this, "value2", false, true);
069        value2.setMultiport(true);
070    }
071
072    /** Clone the actor into the specified workspace. */
073    @Override
074    public Object clone(Workspace workspace) throws CloneNotSupportedException {
075        CrossInput newObject = (CrossInput) super.clone(workspace);
076        newObject._key2List = new LinkedBlockingQueue<Token>();
077        newObject._value2List = new LinkedBlockingQueue<Token>();
078        return newObject;
079    }
080    
081    /** Write the key and value from both inputs to the workflow. */
082    @Override
083    public void fire() throws IllegalActionException {
084
085        super.fire();
086        
087        Token key2Token;
088        Token value2Token;
089            
090        try {
091            key2Token = _key2List.take();
092            value2Token = _value2List.take();
093        } catch(InterruptedException e) {
094            throw new IllegalActionException(this, e, "Error waiting for token.");
095        }
096            
097        if(!_finish.get()) {
098            key2.broadcast(key2Token);
099            value2.broadcast(value2Token);
100        }
101    }
102    
103    /** Stop execution of the workflow the next time postfire() is called. */
104    @Override
105    public void finish() throws InterruptedException {
106        super.finish();
107        // add a token to unblock the take() in fire()
108        _key2List.put(Token.NIL);
109        _value2List.put(Token.NIL);
110    }
111
112    /** Set the key and value from both inputs. */
113    public void setInput(Token key1Token, Token value1Token, Token key2Token, Token value2Token) throws IllegalActionException {
114
115        super.setInput(key1Token, value1Token);
116        try {
117            
118            if(key2Token == null) {
119                key2Token = Token.NIL;
120            }
121            _key2List.put(key2Token);
122            
123            if(value2Token == null) {
124                value2Token = Token.NIL;
125            }
126            _value2List.put(value2Token);
127        } catch(InterruptedException e) {
128            throw new IllegalActionException(this, e, "Error waiting for token lists.");
129        }
130    }
131    
132    /** Second input port for key-value pairs. */
133    public TypedIOPort in2;
134    
135    /** Port to write the key from the second input. */
136    public TypedIOPort key2;
137    
138    /** Port to write the value from the second input. */
139    public TypedIOPort value2;
140
141    ///////////////////////////////////////////////////////////////////
142    ////                         protected methods                 ////
143    
144    /** Create a function used for setting the type for an output port. */
145    @Override
146    protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) {
147        return new CrossInputPortFunction(outputPort);
148    }
149
150    ///////////////////////////////////////////////////////////////////
151    ////                         inner classes                     ////
152
153    /** A MonotonicFunction for setting the output port types. */
154    private class CrossInputPortFunction extends MonotonicFunction {
155        private CrossInputPortFunction(TypedIOPort outputPort) {
156
157            // if output port is key2 or value2, set the name
158            // to be "key" or "value2", respectively, since "key2"
159            // and "value2" are not fields in the key-values pair
160            // record type.
161            if(outputPort == key2) {
162                _outputPortName = "key";
163            } else if(outputPort == value2) {
164                _outputPortName = "value";
165            } else {
166                _outputPortName = outputPort.getName();
167            }
168            
169            if(outputPort == key || outputPort == value) {
170                _inputPort = in;
171            } else {
172                _inputPort = in2;
173            }
174        }
175
176        ///////////////////////////////////////////////////////////////
177        ////                       public inner methods            ////
178
179        /** Return the function result.
180         *  @return A Type.
181         */
182        @Override
183        public Object getValue() throws IllegalActionException {
184            
185            Type retval = null;
186            final Type inPortType = _inputPort.getType();
187            
188            if (inPortType == BaseType.UNKNOWN) {
189                retval = BaseType.UNKNOWN;
190            } else if(inPortType instanceof ArrayType) {
191                
192                final Type elementType = ((ArrayType)inPortType).getElementType();
193                if (elementType instanceof RecordType) {
194                    final Type fieldType = ((RecordType)elementType).get(_outputPortName);
195
196                    if (fieldType == null) {
197                        retval = BaseType.UNKNOWN;
198                    } else {
199                        retval = fieldType;
200                    }
201                }
202            } 
203
204            if(retval == null) {
205                throw new IllegalActionException(CrossInput.this,
206                        "Invalid type for input port " + _inputPort.getName() + " : " + inPortType);
207            }
208            
209            return retval;
210        }
211
212        /** Return an additional string describing the current value
213         *  of this function.
214         */
215        @Override
216        public String getVerboseString() {
217            if (_inputPort.getType() instanceof ArrayType) {
218                if(((ArrayType)_inputPort.getType()).getElementType() instanceof RecordType) {
219                    RecordType type = (RecordType) ((ArrayType)_inputPort.getType()).getElementType();
220                    Type fieldType = type.get(_outputPortName);
221
222                    if (fieldType == null) {
223                        return "Input Record in port " + _inputPort.getName() +
224                            " doesn't have field named " + _outputPortName;
225                    }
226                }
227            }
228
229            return null;
230        }
231
232        /** Return the type variable in this inequality term. If the
233         *  type of the input port is not declared, return an one
234         *  element array containing the inequality term representing
235         *  the type of the port; otherwise, return an empty array.
236         *  @return An array of InequalityTerm.
237         */
238        @Override
239        public InequalityTerm[] getVariables() {
240            InequalityTerm portTerm = _inputPort.getTypeTerm();
241
242            if (portTerm.isSettable()) {
243                InequalityTerm[] variable = new InequalityTerm[1];
244                variable[0] = portTerm;
245                return variable;
246            }
247
248            return (new InequalityTerm[0]);
249        }
250
251        ///////////////////////////////////////////////////////////////
252        ////                       private inner variable          ////
253        
254        private TypedIOPort _inputPort;
255        private String _outputPortName;
256        
257    }
258        
259    /** A list containing tokens to be written by the key2 port. */
260    private LinkedBlockingQueue<Token> _key2List = new LinkedBlockingQueue<Token>();
261
262    /** A list containing tokens to be written by the value2 port. */
263    private LinkedBlockingQueue<Token> _value2List = new LinkedBlockingQueue<Token>();
264
265}