001/* A subscriber that transparently receives tunneled messages from publishers. 002 003 Copyright (c) 2006-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.actor.lib; 029 030import java.util.Set; 031 032import ptolemy.actor.AtomicActor; 033import ptolemy.actor.SubscriberPort; 034import ptolemy.actor.TypedAtomicActor; 035import ptolemy.actor.TypedIOPort; 036import ptolemy.actor.util.ActorDependencies; 037import ptolemy.data.BooleanToken; 038import ptolemy.data.Token; 039import ptolemy.data.expr.Parameter; 040import ptolemy.data.expr.StringParameter; 041import ptolemy.data.type.BaseType; 042import ptolemy.kernel.CompositeEntity; 043import ptolemy.kernel.util.IllegalActionException; 044import ptolemy.kernel.util.KernelException; 045import ptolemy.kernel.util.NameDuplicationException; 046import ptolemy.kernel.util.Workspace; 047 048/////////////////////////////////////////////////////////////////// 049//// Subscriber 050 051/** 052 This actor subscribes to tokens on a named channel. The tokens are 053 "tunneled" from an instance of Publisher that names the same channel. 054 If {@link #global} is false (the default), then this subscriber 055 will only see instances of Publisher that are under the 056 control of the same director. That is, it can 057 be at a different level of the hierarchy, or in an entirely different 058 composite actor, as long as the relevant composite actors are 059 transparent (have no director). If {@link #global} is true, 060 then the publisher may be anywhere in the model, as long as its 061 <i>global</i> parameter is also true. 062 <p> 063 Any number of instances of Subscriber can subscribe to the same 064 channel. 065 <p> 066 This actor actually has a hidden input port that is connected 067 to the publisher via hidden "liberal links" (links that are 068 allowed to cross levels of the hierarchy). Consequently, 069 any data dependencies that the director might assume on a regular 070 "wired" connection will also be assumed across Publisher-Subscriber 071 pairs. Similarly, type constraints will propagate across 072 Publisher-Subscriber pairs. That is, the type of the Subscriber 073 output will match the type of the Publisher input. 074 075 @author Edward A. Lee, Raymond A. Cardillo, Bert Rodiers 076 @version $Id$ 077 @since Ptolemy II 5.2 078 @Pt.ProposedRating Green (cxh) 079 @Pt.AcceptedRating Red (cxh) 080 */ 081public class Subscriber extends TypedAtomicActor { 082 083 /** Construct a subscriber with the specified container and name. 084 * @param container The container actor. 085 * @param name The name of the actor. 086 * @exception IllegalActionException If the actor is not of an acceptable 087 * class for the container. 088 * @exception NameDuplicationException If the name coincides with 089 * an actor already in the container. 090 */ 091 public Subscriber(CompositeEntity container, String name) 092 throws IllegalActionException, NameDuplicationException { 093 // Set this up as input port. 094 super(container, name); 095 096 channel = new StringParameter(this, "channel"); 097 channel.setExpression("channel1"); 098 099 _createInputPort(); 100 input.setMultiport(true); 101 102 output = new TypedIOPort(this, "output", false, true); 103 output.setMultiport(true); 104 105 // We only have constraints from the publisher on the subscriber 106 // and the output of the subscriber and not the other way around 107 // to not break any existing models. 108 output.setWidthEquals(input, false); 109 110 new Parameter(input, "_hide", BooleanToken.TRUE); 111 112 global = new Parameter(this, "global"); 113 global.setExpression("false"); 114 global.setTypeEquals(BaseType.BOOLEAN); 115 116 // Refer the parameters of the input port to those of 117 // this actor. 118 input.channel.setExpression("$channel"); 119 input.global.setExpression("global"); 120 121 output.setTypeAtLeast(input); 122 } 123 124 /////////////////////////////////////////////////////////////////// 125 //// ports and parameters //// 126 127 /** The name of the channel. Subscribers that reference this same 128 * channel will receive any transmissions to this port. 129 * This is a string that defaults to "channel1". 130 */ 131 public StringParameter channel; 132 133 /** Specification of whether the data is subscribed globally. 134 * If this is set to true, then this subscriber will see values 135 * published by a publisher anywhere in the model references the same 136 * channel by name. If this is set to false (the default), then only 137 * values published by the publisher that are fired by the same 138 * director are seen by this subscriber. 139 */ 140 public Parameter global; 141 142 /** The input port. This port is hidden and should not be 143 * directly used. This base class imposes no type constraints except 144 * that the type of the input cannot be greater than the type of the 145 * output. 146 */ 147 public SubscriberPort input; 148 149 /** The output port. This is a multiport. If the corresponding 150 * publisher has multiple input signals, then those multiple signals 151 * will appear on this output port. 152 * By default, the type of this output is constrained 153 * to be at least that of the input. This port is hidden by default 154 * and the actor handles creating connections to it. 155 */ 156 public TypedIOPort output; 157 158 /////////////////////////////////////////////////////////////////// 159 //// public methods //// 160 161 /** Clone the actor into the specified workspace. 162 * @param workspace The workspace for the new object. 163 * @return A new actor. 164 * @exception CloneNotSupportedException If a derived class contains 165 * an attribute that cannot be cloned. 166 */ 167 @Override 168 public Object clone(Workspace workspace) throws CloneNotSupportedException { 169 Subscriber newObject = (Subscriber) super.clone(workspace); 170 171 // We only have constraints from the publisher on the subscriber 172 // and the output of the subscriber and not the other way around 173 // to not break any existing models. 174 newObject.output.setWidthEquals(newObject.input, false); 175 176 newObject.output.setTypeAtLeast(newObject.input); 177 178 return newObject; 179 } 180 181 /** Read at most one input token from each input 182 * channel and send it to the output. 183 * @exception IllegalActionException If there is no director, or 184 * if there is no input connection. 185 */ 186 @Override 187 public void fire() throws IllegalActionException { 188 super.fire(); 189 int width = input.getWidth(); 190 if (width == 0) { 191 channel.validate(); 192 throw new IllegalActionException(this, 193 "Subscriber could not find a matching Publisher " 194 + "with channel \"" + channel.stringValue() + "\""); 195 196 } 197 for (int i = 0; i < width; i++) { 198 if (input.hasToken(i)) { 199 Token token = input.get(i); 200 if (i < output.getWidth()) { 201 output.send(i, token); 202 } 203 } 204 } 205 } 206 207 /** Return a Set of Publishers that are connected to this Subscriber. 208 * @return A Set of Publishers that are connected to this Subscriber. 209 * @exception KernelException If thrown when a Manager is added to 210 * the top level or if preinitialize() fails. 211 */ 212 public Set<AtomicActor> publishers() throws KernelException { 213 return ActorDependencies.prerequisites(this, Publisher.class); 214 } 215 216 /////////////////////////////////////////////////////////////////// 217 //// protected methods //// 218 219 /** Create an input port. This is a protected method so that 220 * subclasses can create different input ports. This is called 221 * in the constructor, so subclasses cannot reliably access 222 * local variables. 223 * @exception IllegalActionException If creating the input port fails. 224 * @exception NameDuplicationException If there is already a port named "input". 225 */ 226 protected void _createInputPort() 227 throws IllegalActionException, NameDuplicationException { 228 input = new SubscriberPort(this, "input"); 229 } 230}