001/* A stub actor that writes data into workflows for the CoGroup pattern.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-04-12 00:09:39 +0000 (Sat, 12 Apr 2014) $' 
008 * '$Revision: 32657 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032import java.util.concurrent.LinkedBlockingQueue;
033
034import org.kepler.ddp.actor.pattern.Types;
035
036import ptolemy.actor.TypedIOPort;
037import ptolemy.data.Token;
038import ptolemy.data.type.ArrayType;
039import ptolemy.data.type.BaseType;
040import ptolemy.data.type.MonotonicFunction;
041import ptolemy.data.type.RecordType;
042import ptolemy.data.type.Type;
043import ptolemy.graph.InequalityTerm;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047import ptolemy.kernel.util.Workspace;
048
049/** A stub actor that writes data into workflows for the CoGroup pattern.
050 * 
051 *  @author Daniel Crawl
052 *  @version $Id: CoGroupInput.java 32657 2014-04-12 00:09:39Z crawl $
053 */
054public class CoGroupInput extends StubSourceActor {
055
056    /** Construct a new CoGroupInput in a container with a given name. */
057    public CoGroupInput(CompositeEntity container, String name)
058            throws IllegalActionException, NameDuplicationException {
059        
060        super(container, name);
061
062        in2 = new TypedIOPort(this, "in2", true, false);
063        in2.setTypeAtMost(Types.keyValueArrayType);        
064
065        values1 = new TypedIOPort(this, "values1", false, true);
066        values1.setMultiport(true);
067
068        values2 = new TypedIOPort(this, "values2", false, true);
069        values2.setMultiport(true);
070
071    }
072    
073    /** Clone the actor into the specified workspace. */
074    @Override
075    public Object clone(Workspace workspace) throws CloneNotSupportedException {
076        CoGroupInput newObject = (CoGroupInput)super.clone(workspace);
077        newObject._values1List = new LinkedBlockingQueue<Token>();
078        newObject._values2List = new LinkedBlockingQueue<Token>();
079        return newObject;
080    }
081    
082    /** Write the key and values from both inputs to the workflow. */
083    @Override
084    public void fire() throws IllegalActionException {
085        
086        Token keyToken;
087        Token values1Token;
088        Token values2Token;
089        
090        try {
091            keyToken = _keyList.take();          
092            values1Token = _values1List.take();                
093            values2Token = _values2List.take();
094        } catch(InterruptedException e) {
095            throw new IllegalActionException(this, e, "Error waiting for token lists.");
096        }
097        
098        if(!_finish.get()) {
099            key.broadcast(keyToken);
100            values1.broadcast(values1Token);
101            values2.broadcast(values2Token);
102        }
103
104    }
105    
106    /** Stop execution of the workflow the next time postfire() is called. */
107    @Override
108    public void finish() throws InterruptedException {
109        super.finish();
110        // add a token to unblock the take() in fire()
111        _values1List.put(Token.NIL);
112        _values2List.put(Token.NIL);
113    }
114
115    /** Set the key and values from both inputs. */
116    public void setInput(Token keyToken, Token values1Token, Token values2Token) throws IllegalActionException {
117        
118        try {
119            _keyList.put(keyToken);
120            _values1List.put(values1Token);
121            _values2List.put(values2Token);
122        } catch(InterruptedException e) {
123            throw new IllegalActionException(this, e, "Error waiting for token lists.");
124        }
125    }
126    
127    /** The input for key-value pairs. */
128    public TypedIOPort in2;
129    
130    /** Port to write the values from the first input. */
131    public TypedIOPort values1;
132    
133    /** Port to write the values from the second input. */
134    public TypedIOPort values2;
135
136    ///////////////////////////////////////////////////////////////////
137    ////                         protected methods                 ////
138
139    /** Create a function used for setting the type for an output port. */
140    @Override
141    protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) {
142        return new CoGroupInputPortFunction(outputPort);
143    }
144
145    ///////////////////////////////////////////////////////////////////
146    ////                         inner classes                     ////
147
148    /** A MonotonicFunction for setting the output port types. */
149    private class CoGroupInputPortFunction extends MonotonicFunction {
150        private CoGroupInputPortFunction(TypedIOPort outputPort) {
151            
152            _outputPort = outputPort;
153            
154            if(outputPort == key || outputPort == values1) {
155                _inputPort = in;
156            } else {
157                _inputPort = in2;
158            }
159        }
160
161        ///////////////////////////////////////////////////////////////
162        ////                       public inner methods            ////
163
164        /** Return the function result.
165         *  @return A Type.
166         */
167        @Override
168        public Object getValue() throws IllegalActionException {
169            
170            Type retval = null;
171            final Type inPortType = _inputPort.getType();
172            
173            if (inPortType == BaseType.UNKNOWN) {
174                retval = BaseType.UNKNOWN;
175            } else if (inPortType instanceof ArrayType) {
176                
177                final Type elementType = ((ArrayType)inPortType).getElementType();
178                
179                if(elementType instanceof RecordType) {
180                    Type fieldType;
181                    if(_outputPort == key) {
182                        fieldType = ((RecordType)elementType).get("key");
183                    } else {
184                        fieldType = ((RecordType)elementType).get("value");
185                    }
186    
187                    if (fieldType == null) {
188                        retval = BaseType.UNKNOWN;
189                    } else if(_outputPort == key) {
190                        retval = fieldType;
191                    } else {
192                        retval = new ArrayType(fieldType);
193                    }
194                }
195            } 
196            
197            if(retval == null) {
198                throw new IllegalActionException(CoGroupInput.this,
199                        "Invalid type for input port " + _inputPort.getName() + " : " + inPortType);
200            }
201            
202            return retval;
203        }
204
205        /** Return an additional string describing the current value
206         *  of this function.
207         */
208        @Override
209        public String getVerboseString() {
210            if (_inputPort.getType() instanceof ArrayType) {
211                if(((ArrayType)_inputPort.getType()).getElementType() instanceof RecordType) {
212                    RecordType type = (RecordType) ((ArrayType)_inputPort.getType()).getElementType();
213                    Type fieldType = type.get("value");
214
215                    if (fieldType == null) {
216                        return "Input Record in port " + _inputPort.getName() +
217                            " doesn't have field named \"value\".";
218                    }
219                }
220            }
221
222            return null;
223        }
224
225        /** Return the type variable in this inequality term. If the
226         *  type of the input port is not declared, return an one
227         *  element array containing the inequality term representing
228         *  the type of the port; otherwise, return an empty array.
229         *  @return An array of InequalityTerm.
230         */
231        @Override
232        public InequalityTerm[] getVariables() {
233            InequalityTerm portTerm = _inputPort.getTypeTerm();
234
235            if (portTerm.isSettable()) {
236                InequalityTerm[] variable = new InequalityTerm[1];
237                variable[0] = portTerm;
238                return variable;
239            }
240
241            return (new InequalityTerm[0]);
242        }
243
244        ///////////////////////////////////////////////////////////////
245        ////                       private inner variable          ////
246        
247        private TypedIOPort _inputPort;
248        private TypedIOPort _outputPort;
249        
250    }
251
252    /** Values from the first input.*/
253    private LinkedBlockingQueue<Token> _values1List = new LinkedBlockingQueue<Token>();
254    
255    /** Values from the second input. */
256    private LinkedBlockingQueue<Token> _values2List = new LinkedBlockingQueue<Token>();
257    
258}