001/* A stub actor that writes data into workflows for the Reduce pattern.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-05-19 20:43:29 +0000 (Mon, 19 May 2014) $' 
008 * '$Revision: 32724 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032
033import java.util.concurrent.LinkedBlockingQueue;
034
035import ptolemy.actor.TypedIOPort;
036import ptolemy.data.Token;
037import ptolemy.data.type.ArrayType;
038import ptolemy.data.type.BaseType;
039import ptolemy.data.type.MonotonicFunction;
040import ptolemy.data.type.RecordType;
041import ptolemy.data.type.Type;
042import ptolemy.graph.InequalityTerm;
043import ptolemy.kernel.CompositeEntity;
044import ptolemy.kernel.util.IllegalActionException;
045import ptolemy.kernel.util.NameDuplicationException;
046import ptolemy.kernel.util.Workspace;
047
048/** A stub actor that writes data into workflows for the Reduce pattern.
049 * 
050 *  @author Daniel Crawl
051 *  @version $Id: ReduceInput.java 32724 2014-05-19 20:43:29Z crawl $
052 */
053public class ReduceInput extends StubSourceActor {
054
055    /** Construct a new ReduceInput in a container with a given name. */
056    public ReduceInput(CompositeEntity container, String name)
057            throws IllegalActionException, NameDuplicationException {
058
059        super(container, name);
060
061        values = new TypedIOPort(this, "values", false, true);
062        values.setMultiport(true);
063    }
064    
065    /** Clone the actor into the specified workspace. */
066    @Override
067    public Object clone(Workspace workspace) throws CloneNotSupportedException {
068        ReduceInput newObject = (ReduceInput)super.clone(workspace);
069        newObject._valuesList = new LinkedBlockingQueue<Token>();
070        return newObject;
071    }
072
073    /** Stop execution of the workflow the next time postfire() is called. */
074    @Override
075    public void finish() throws InterruptedException {
076        super.finish();
077        // add a token to unblock the take() in fire()
078        _valuesList.put(Token.NIL);
079    }
080
081    /** Write the key and values from the input to the workflow. */
082    @Override
083    public void fire() throws IllegalActionException {
084
085        Token keyToken = null;
086        Token valuesToken = null;
087        
088        try {
089            keyToken = _keyList.take();
090            valuesToken = _valuesList.take();
091        } catch(InterruptedException e) {
092            throw new IllegalActionException(this, e, "Error waiting for token.");
093        }
094
095        if(!_finish.get()) {
096            key.broadcast(keyToken);
097            values.broadcast(valuesToken);
098        }
099        
100    }
101    
102    /** Set the input and values. */
103    public void setInput(Token keyToken, Token valuesToken) throws IllegalActionException {
104        try {
105            _keyList.put(keyToken);
106            _valuesList.put(valuesToken);
107        } catch(InterruptedException e) {
108            // do not rethrow since this exception can occur when stopping a DDP
109            // job. instead just print to stderr.
110            //throw new IllegalActionException(this, e, "Error waiting for token lists.");
111            System.err.println("Got InterruptedException.");
112            return;
113        } 
114    }
115            
116    /** Port to write the values to the workflow. */
117    public TypedIOPort values;
118
119    
120    ///////////////////////////////////////////////////////////////////
121    ////                         protected methods                 ////
122    
123    /** Create a function used for setting the type for an output port. */
124    @Override
125    protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) {
126        return new ReduceInputPortFunction(outputPort);
127    }
128
129    ///////////////////////////////////////////////////////////////////
130    ////                         inner classes                     ////
131    
132    /** A MonotonicFunction for setting the output port types. */
133    private class ReduceInputPortFunction extends MonotonicFunction {
134        private ReduceInputPortFunction(TypedIOPort port) {
135            _port = port;
136        }
137
138        ///////////////////////////////////////////////////////////////
139        ////                       public inner methods            ////
140
141        /** Return the function result.
142         *  @return A Type.
143         */
144        @Override
145        public Object getValue() throws IllegalActionException {
146            
147            Type retval = null;
148            final Type inPortType = in.getType();
149            
150            if (inPortType == BaseType.UNKNOWN || inPortType == BaseType.ARRAY_BOTTOM) {
151                retval = BaseType.UNKNOWN;
152            } else if(inPortType instanceof ArrayType) {
153                
154                final Type elementType = ((ArrayType)inPortType).getElementType();
155                
156                if (elementType == BaseType.UNKNOWN) {
157                    retval = BaseType.UNKNOWN;
158                } else if (elementType instanceof RecordType) {
159
160                    Type fieldType;
161                    if(_port == values) {
162                        fieldType = ((RecordType)elementType).get("value");
163                    } else {
164                        fieldType = ((RecordType)elementType).get(_port.getName());
165                    }
166    
167                    if (fieldType == null) {
168                        retval = BaseType.UNKNOWN;
169                    } else if (_port == values) {
170                        retval = new ArrayType(fieldType);
171                    } else {
172                        retval = fieldType;
173                    }
174                }
175            } 
176            
177            if(retval == null) {
178                throw new IllegalActionException(ReduceInput.this,
179                        "Invalid type for input port: " + inPortType);
180            }
181            
182            return retval;
183            
184        }
185
186        /** Return an additional string describing the current value
187         *  of this function.
188         */
189        @Override
190        public String getVerboseString() {
191            if (in.getType() instanceof ArrayType) {
192                if(((ArrayType)in.getType()).getElementType() instanceof RecordType) {
193                    RecordType type = (RecordType) ((ArrayType)in.getType()).getElementType();
194                    if(_port == values) {
195                        if (type.get("value") == null) {
196                            return "Input Record doesn't have field named value";
197                        }
198                    } else if (type.get(_port.getName()) == null) {
199                            return "Input Record doesn't have field named value";
200                    }
201                }
202            }
203
204            return null;
205        }
206
207        /** Return the type variable in this inequality term. If the
208         *  type of the input port is not declared, return an one
209         *  element array containing the inequality term representing
210         *  the type of the port; otherwise, return an empty array.
211         *  @return An array of InequalityTerm.
212         */
213        @Override
214        public InequalityTerm[] getVariables() {
215            InequalityTerm portTerm = in.getTypeTerm();
216
217            if (portTerm.isSettable()) {
218                InequalityTerm[] variable = new InequalityTerm[1];
219                variable[0] = portTerm;
220                return variable;
221            }
222
223            return (new InequalityTerm[0]);
224        }
225
226        ///////////////////////////////////////////////////////////////
227        ////                       private inner variable          ////
228        private TypedIOPort _port;
229    }
230    
231    /** A list containing tokens to be written by the values port. */
232    private LinkedBlockingQueue<Token> _valuesList = new LinkedBlockingQueue<Token>();
233
234}