001/* A base class for DDP pattern actor stubs that write data to the workflow. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-04-12 00:09:39 +0000 (Sat, 12 Apr 2014) $' 008 * '$Revision: 32657 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern.stub; 031 032import java.util.HashSet; 033import java.util.Iterator; 034import java.util.Set; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.concurrent.atomic.AtomicBoolean; 037 038import org.kepler.ddp.actor.pattern.Types; 039 040import ptolemy.actor.TypedIOPort; 041import ptolemy.data.Token; 042import ptolemy.data.type.MonotonicFunction; 043import ptolemy.graph.Inequality; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.Workspace; 048 049/** A base class for DDP pattern actor stubs that write data to the workflow. 050 * 051 * @author Daniel Crawl 052 * @version $Id: StubSourceActor.java 32657 2014-04-12 00:09:39Z crawl $ 053 */ 054public class StubSourceActor extends StubBaseActor { 055 056 /** Construct a new StubSourceActor in a container with a given name. */ 057 public StubSourceActor(CompositeEntity container, String name) 058 throws IllegalActionException, NameDuplicationException { 059 super(container, name); 060 061 in = new TypedIOPort(this, "in", true, false); 062 in.setTypeAtMost(Types.keyValueArrayType); 063 064 key = new TypedIOPort(this, "key", false, true); 065 key.setMultiport(true); 066 067 } 068 069 /** Clone the actor into the specified workspace. */ 070 @Override 071 public Object clone(Workspace workspace) throws CloneNotSupportedException { 072 StubSourceActor newObject = (StubSourceActor)super.clone(workspace); 073 newObject._finish = new AtomicBoolean(false); 074 newObject._keyList = new LinkedBlockingQueue<Token>(); 075 return newObject; 076 } 077 078 /** Stop execution of the workflow the next time postfire() is called. */ 079 public void finish() throws InterruptedException { 080 _finish.set(true); 081 // add a token to unblock the take() in fire() 082 _keyList.put(Token.NIL); 083 } 084 085 /** Returns true if the actor should execute again. */ 086 @Override 087 public boolean postfire() throws IllegalActionException { 088 089 if(_runWorkflowLifecyclePerInput) { 090 return false; 091 } else if(_finish.get()) { 092 getDirector().stop(); 093 return false; 094 } 095 return true; 096 } 097 098 /** Input port for key-value pairs. */ 099 public TypedIOPort in; 100 101 /** Output port of key to write to workflow. */ 102 public TypedIOPort key; 103 104 /////////////////////////////////////////////////////////////////// 105 //// protected methods //// 106 107 /** Create a function used for setting the type for an output port. In this class, 108 * returns null. */ 109 protected MonotonicFunction _createPortFunction(TypedIOPort outputPort) { 110 return null; 111 } 112 113 /** Return the type constraints of this actor. The type constraint is 114 * that the type of the output ports is no less than the type of the 115 * fields of the input RecordToken. 116 * @return a list of Inequality. 117 */ 118 @Override 119 protected Set<Inequality> _customTypeConstraints() { 120 121 // set the constraints between record fields and output ports 122 Set<Inequality> constraints = new HashSet<Inequality>(); 123 124 // since the input port has a clone of the above RecordType, need to 125 // get the type from the input port. 126 // RecordType inputTypes = (RecordType)input.getType(); 127 Iterator<?> outputPorts = outputPortList().iterator(); 128 129 while (outputPorts.hasNext()) { 130 TypedIOPort outputPort = (TypedIOPort) outputPorts.next(); 131 Inequality inequality = new Inequality(_createPortFunction(outputPort), 132 outputPort.getTypeTerm()); 133 constraints.add(inequality); 134 } 135 136 return constraints; 137 } 138 139 /** Do not establish the usual default type constraints. 140 * @return null 141 */ 142 @Override 143 protected Set<Inequality> _defaultTypeConstraints() { 144 return null; 145 } 146 147 /** If true, the actor will stop workflow execution. */ 148 protected AtomicBoolean _finish = new AtomicBoolean(false); 149 150 /** A list containing tokens to be written by the key port. */ 151 protected LinkedBlockingQueue<Token> _keyList = new LinkedBlockingQueue<Token>(); 152 153}