001/* A stub actor that writes data into workflows for the Match pattern.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-04-12 00:09:39 +0000 (Sat, 12 Apr 2014) $' 
008 * '$Revision: 32657 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032import java.util.concurrent.LinkedBlockingQueue;
033
034import org.kepler.ddp.actor.pattern.Types;
035
036import ptolemy.actor.TypedIOPort;
037import ptolemy.data.Token;
038import ptolemy.data.type.ArrayType;
039import ptolemy.data.type.BaseType;
040import ptolemy.data.type.MonotonicFunction;
041import ptolemy.data.type.RecordType;
042import ptolemy.data.type.Type;
043import ptolemy.graph.InequalityTerm;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047import ptolemy.kernel.util.Workspace;
048
049/** A stub actor that writes data into workflows for the Match pattern.
050 * 
051 *  @author Daniel Crawl
052 *  @version $Id: MatchInput.java 32657 2014-04-12 00:09:39Z crawl $
053 */
054public class MatchInput extends StubSourceActor {
055
056    /** Construct a new MatchInput in a container with a given name. */
057    public MatchInput(CompositeEntity container, String name)
058            throws IllegalActionException, NameDuplicationException {
059        
060        super(container, name);
061    
062        in2 = new TypedIOPort(this, "in2", true, false);
063        in2.setTypeAtMost(Types.keyValueArrayType);        
064
065        value1 = new TypedIOPort(this, "value1", false, true);
066        value1.setMultiport(true);
067
068        value2 = new TypedIOPort(this, "value2", false, true);
069        value2.setMultiport(true);
070
071    }
072    
073    /** Clone the actor into the specified workspace. */
074    @Override
075    public Object clone(Workspace workspace) throws CloneNotSupportedException {
076        MatchInput newObject = (MatchInput)super.clone(workspace);
077        newObject._value1List = new LinkedBlockingQueue<Token>();
078        newObject._value2List = new LinkedBlockingQueue<Token>();
079        return newObject;
080    }
081    
082    /** Write the key and value from both inputs to the workflow. */
083    @Override
084    public void fire() throws IllegalActionException {
085        
086        Token keyToken;
087        Token value1Token;
088        Token value2Token;
089        try {
090            keyToken = _keyList.take();
091            value1Token = _value1List.take();
092            value2Token = _value2List.take();
093        } catch(InterruptedException e) {
094            throw new IllegalActionException(this, e, "Error waiting for token lists.");
095        }
096
097        if(!_finish.get()) {
098            key.broadcast(keyToken);
099            value1.broadcast(value1Token);
100            value2.broadcast(value2Token);
101        }
102    }
103    
104    /** Stop execution of the workflow the next time postfire() is called. */
105    @Override
106    public void finish() throws InterruptedException {
107        super.finish();
108        // add a token to unblock the take() in fire()
109        _value1List.put(Token.NIL);
110        _value2List.put(Token.NIL);
111    }
112
113    /** Set the key and value from both inputs. */
114    public void setInput(Token keyToken, Token value1Token, Token value2Token) throws IllegalActionException {
115        
116        try {
117            _keyList.put(keyToken);
118            _value1List.put(value1Token);
119            _value2List.put(value2Token);
120        } catch(InterruptedException e) {
121            throw new IllegalActionException(this, e, "Error waiting for token lists.");
122        }
123    }
124    
125    /** The second input port of key-value pairs. */
126    public TypedIOPort in2;
127    
128    /** Port to write the value from the first input. */
129    public TypedIOPort value1;
130    
131    /** Port to write the value from the second input. */
132    public TypedIOPort value2;
133
134    ///////////////////////////////////////////////////////////////////
135    ////                         protected methods                 ////
136
137    /** Create a function used for setting the type for an output port. */
138    @Override
139    protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) {
140        return new MatchInputPortFunction(outputPort);
141    }
142
143    ///////////////////////////////////////////////////////////////////
144    ////                         inner classes                     ////
145
146    /** A MonotonicFunction for setting the output port types. */
147    private class MatchInputPortFunction extends MonotonicFunction {
148        private MatchInputPortFunction(TypedIOPort outputPort) {
149            
150            _outputPort = outputPort;
151            
152            if(outputPort == key || outputPort == value1) {
153                _inputPort = in;
154            } else {
155                _inputPort = in2;
156            }
157        }
158
159        ///////////////////////////////////////////////////////////////
160        ////                       public inner methods            ////
161
162        /** Return the function result.
163         *  @return A Type.
164         */
165        @Override
166        public Object getValue() throws IllegalActionException {
167            
168            Type retval = null;
169            final Type inPortType = _inputPort.getType();
170            
171            if (inPortType == BaseType.UNKNOWN) {
172                retval = BaseType.UNKNOWN;
173            } else if (inPortType instanceof ArrayType) {
174                
175                final Type elementType = ((ArrayType)inPortType).getElementType();
176                if(elementType instanceof RecordType) {
177                    
178                    Type fieldType;
179                    if(_outputPort == key) {
180                        fieldType = ((RecordType)elementType).get("key");
181                    } else {
182                        fieldType = ((RecordType)elementType).get("value");
183                    }
184    
185                    if (fieldType == null) {
186                        retval = BaseType.UNKNOWN;
187                    } else {
188                        retval = fieldType;
189                    }
190                }
191            }
192            
193            if(retval == null) {
194                throw new IllegalActionException(MatchInput.this,
195                        "Invalid type for input port " + _inputPort.getName() + " : " + inPortType);
196            }
197            
198            return retval;
199        }
200
201        /** Return an additional string describing the current value
202         *  of this function.
203         */
204        @Override
205        public String getVerboseString() {
206            if (_inputPort.getType() instanceof ArrayType) {
207                if(((ArrayType)_inputPort.getType()).getElementType() instanceof RecordType) {
208                    RecordType type = (RecordType) ((ArrayType)_inputPort.getType()).getElementType();
209                    Type fieldType = type.get("value");
210
211                    if (fieldType == null) {
212                        return "Input Record in port " + _inputPort.getName() +
213                            " doesn't have field named \"value\".";
214                    }
215                }
216            }
217
218            return null;
219        }
220
221        /** Return the type variable in this inequality term. If the
222         *  type of the input port is not declared, return an one
223         *  element array containing the inequality term representing
224         *  the type of the port; otherwise, return an empty array.
225         *  @return An array of InequalityTerm.
226         */
227        @Override
228        public InequalityTerm[] getVariables() {
229            InequalityTerm portTerm = _inputPort.getTypeTerm();
230
231            if (portTerm.isSettable()) {
232                InequalityTerm[] variable = new InequalityTerm[1];
233                variable[0] = portTerm;
234                return variable;
235            }
236
237            return (new InequalityTerm[0]);
238        }
239
240        ///////////////////////////////////////////////////////////////
241        ////                       private inner variable          ////
242        
243        private TypedIOPort _inputPort;
244        private TypedIOPort _outputPort;
245        
246    }
247
248
249    /** A list containing tokens to be written by the values1 port. */
250    private LinkedBlockingQueue<Token> _value1List = new LinkedBlockingQueue<Token>();
251    
252    /** A list containing tokens to be written by the values2 port. */
253    private LinkedBlockingQueue<Token> _value2List = new LinkedBlockingQueue<Token>();
254
255}