001/* A base class for DDP pattern actor stubs that write data to the workflow.
002 *  
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-04-12 00:09:39 +0000 (Sat, 12 Apr 2014) $' 
008 * '$Revision: 32657 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern.stub;
031
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.Set;
035import java.util.concurrent.LinkedBlockingQueue;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import org.kepler.ddp.actor.pattern.Types;
039
040import ptolemy.actor.TypedIOPort;
041import ptolemy.data.Token;
042import ptolemy.data.type.MonotonicFunction;
043import ptolemy.graph.Inequality;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047import ptolemy.kernel.util.Workspace;
048
049/** A base class for DDP pattern actor stubs that write data to the workflow.
050 * 
051 *  @author Daniel Crawl
052 *  @version $Id: StubSourceActor.java 32657 2014-04-12 00:09:39Z crawl $
053 */
054public class StubSourceActor extends StubBaseActor {
055
056    /** Construct a new StubSourceActor in a container with a given name. */
057    public StubSourceActor(CompositeEntity container, String name)
058            throws IllegalActionException, NameDuplicationException {
059        super(container, name);
060
061        in = new TypedIOPort(this, "in", true, false);
062        in.setTypeAtMost(Types.keyValueArrayType);
063        
064        key = new TypedIOPort(this, "key", false, true);
065        key.setMultiport(true);
066        
067    }
068    
069    /** Clone the actor into the specified workspace. */
070    @Override
071    public Object clone(Workspace workspace) throws CloneNotSupportedException {
072        StubSourceActor newObject = (StubSourceActor)super.clone(workspace);
073        newObject._finish = new AtomicBoolean(false);
074        newObject._keyList = new LinkedBlockingQueue<Token>();
075        return newObject;
076    }
077    
078    /** Stop execution of the workflow the next time postfire() is called. */
079    public void finish() throws InterruptedException {
080        _finish.set(true);
081        // add a token to unblock the take() in fire()
082        _keyList.put(Token.NIL);
083    }
084    
085    /** Returns true if the actor should execute again. */
086    @Override
087    public boolean postfire() throws IllegalActionException {
088
089        if(_runWorkflowLifecyclePerInput) {
090            return false;
091        } else if(_finish.get()) {
092            getDirector().stop();
093            return false;
094        }
095        return true;
096    }
097        
098    /** Input port for key-value pairs. */
099    public TypedIOPort in;
100    
101    /** Output port of key to write to workflow. */
102    public TypedIOPort key;
103
104    ///////////////////////////////////////////////////////////////////
105    ////                         protected methods                 ////
106    
107    /** Create a function used for setting the type for an output port. In this class,
108     *  returns null. */
109    protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) {
110        return null;
111    }
112
113    /** Return the type constraints of this actor. The type constraint is
114     *  that the type of the output ports is no less than the type of the
115     *  fields of the input RecordToken.
116     *  @return a list of Inequality.
117     */
118    @Override
119    protected Set<Inequality> _customTypeConstraints() {
120
121        // set the constraints between record fields and output ports
122        Set<Inequality> constraints = new HashSet<Inequality>();
123
124        // since the input port has a clone of the above RecordType, need to
125        // get the type from the input port.
126        //   RecordType inputTypes = (RecordType)input.getType();
127        Iterator<?> outputPorts = outputPortList().iterator();
128
129        while (outputPorts.hasNext()) {
130            TypedIOPort outputPort = (TypedIOPort) outputPorts.next();
131            Inequality inequality = new Inequality(_createPortFunction(outputPort),
132                    outputPort.getTypeTerm());
133            constraints.add(inequality);
134        }
135
136        return constraints;
137    }
138
139    /** Do not establish the usual default type constraints.
140     *  @return null
141     */
142    @Override
143    protected Set<Inequality> _defaultTypeConstraints() {
144        return null;
145    }
146        
147    /** If true, the actor will stop workflow execution. */
148    protected AtomicBoolean _finish = new AtomicBoolean(false);
149        
150    /** A list containing tokens to be written by the key port. */
151    protected LinkedBlockingQueue<Token> _keyList = new LinkedBlockingQueue<Token>();
152
153}