001/* A subscriber that transparently receives tunneled messages from publishers.
002
003 Copyright (c) 2006-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.actor.lib;
029
030import java.util.Set;
031
032import ptolemy.actor.AtomicActor;
033import ptolemy.actor.SubscriberPort;
034import ptolemy.actor.TypedAtomicActor;
035import ptolemy.actor.TypedIOPort;
036import ptolemy.actor.util.ActorDependencies;
037import ptolemy.data.BooleanToken;
038import ptolemy.data.Token;
039import ptolemy.data.expr.Parameter;
040import ptolemy.data.expr.StringParameter;
041import ptolemy.data.type.BaseType;
042import ptolemy.kernel.CompositeEntity;
043import ptolemy.kernel.util.IllegalActionException;
044import ptolemy.kernel.util.KernelException;
045import ptolemy.kernel.util.NameDuplicationException;
046import ptolemy.kernel.util.Workspace;
047
048///////////////////////////////////////////////////////////////////
049//// Subscriber
050
051/**
052 This actor subscribes to tokens on a named channel. The tokens are
053 "tunneled" from an instance of Publisher that names the same channel.
054 If {@link #global} is false (the default), then this subscriber
055 will only see instances of Publisher that are under the
056 control of the same director. That is, it can
057 be at a different level of the hierarchy, or in an entirely different
058 composite actor, as long as the relevant composite actors are
059 transparent (have no director). If {@link #global} is true,
060 then the publisher may be anywhere in the model, as long as its
061 <i>global</i> parameter is also true.
062 <p>
063 Any number of instances of Subscriber can subscribe to the same
064 channel.
065 <p>
066 This actor actually has a hidden input port that is connected
067 to the publisher via hidden "liberal links" (links that are
068 allowed to cross levels of the hierarchy).  Consequently,
069 any data dependencies that the director might assume on a regular
070 "wired" connection will also be assumed across Publisher-Subscriber
071 pairs.  Similarly, type constraints will propagate across
072 Publisher-Subscriber pairs. That is, the type of the Subscriber
073 output will match the type of the Publisher input.
074
075 @author Edward A. Lee, Raymond A. Cardillo, Bert Rodiers
076 @version $Id$
077 @since Ptolemy II 5.2
078 @Pt.ProposedRating Green (cxh)
079 @Pt.AcceptedRating Red (cxh)
080 */
081public class Subscriber extends TypedAtomicActor {
082
083    /** Construct a subscriber with the specified container and name.
084     *  @param container The container actor.
085     *  @param name The name of the actor.
086     *  @exception IllegalActionException If the actor is not of an acceptable
087     *   class for the container.
088     *  @exception NameDuplicationException If the name coincides with
089     *   an actor already in the container.
090     */
091    public Subscriber(CompositeEntity container, String name)
092            throws IllegalActionException, NameDuplicationException {
093        // Set this up as input port.
094        super(container, name);
095
096        channel = new StringParameter(this, "channel");
097        channel.setExpression("channel1");
098
099        _createInputPort();
100        input.setMultiport(true);
101
102        output = new TypedIOPort(this, "output", false, true);
103        output.setMultiport(true);
104
105        // We only have constraints from the publisher on the subscriber
106        // and the output of the subscriber and not the other way around
107        // to not break any existing models.
108        output.setWidthEquals(input, false);
109
110        new Parameter(input, "_hide", BooleanToken.TRUE);
111
112        global = new Parameter(this, "global");
113        global.setExpression("false");
114        global.setTypeEquals(BaseType.BOOLEAN);
115
116        // Refer the parameters of the input port to those of
117        // this actor.
118        input.channel.setExpression("$channel");
119        input.global.setExpression("global");
120
121        output.setTypeAtLeast(input);
122    }
123
124    ///////////////////////////////////////////////////////////////////
125    ////                   ports and parameters                    ////
126
127    /** The name of the channel.  Subscribers that reference this same
128     *  channel will receive any transmissions to this port.
129     *  This is a string that defaults to "channel1".
130     */
131    public StringParameter channel;
132
133    /** Specification of whether the data is subscribed globally.
134     *  If this is set to true, then this subscriber will see values
135     *  published by a publisher anywhere in the model references the same
136     *  channel by name. If this is set to false (the default), then only
137     *  values published by the publisher that are fired by the same
138     *  director are seen by this subscriber.
139     */
140    public Parameter global;
141
142    /** The input port.  This port is hidden and should not be
143     *  directly used. This base class imposes no type constraints except
144     *  that the type of the input cannot be greater than the type of the
145     *  output.
146     */
147    public SubscriberPort input;
148
149    /** The output port. This is a multiport. If the corresponding
150     *  publisher has multiple input signals, then those multiple signals
151     *  will appear on this output port.
152     *  By default, the type of this output is constrained
153     *  to be at least that of the input. This port is hidden by default
154     *  and the actor handles creating connections to it.
155     */
156    public TypedIOPort output;
157
158    ///////////////////////////////////////////////////////////////////
159    ////                         public methods                    ////
160
161    /** Clone the actor into the specified workspace.
162     *  @param workspace The workspace for the new object.
163     *  @return A new actor.
164     *  @exception CloneNotSupportedException If a derived class contains
165     *   an attribute that cannot be cloned.
166     */
167    @Override
168    public Object clone(Workspace workspace) throws CloneNotSupportedException {
169        Subscriber newObject = (Subscriber) super.clone(workspace);
170
171        // We only have constraints from the publisher on the subscriber
172        // and the output of the subscriber and not the other way around
173        // to not break any existing models.
174        newObject.output.setWidthEquals(newObject.input, false);
175
176        newObject.output.setTypeAtLeast(newObject.input);
177
178        return newObject;
179    }
180
181    /** Read at most one input token from each input
182     *  channel and send it to the output.
183     *  @exception IllegalActionException If there is no director, or
184     *   if there is no input connection.
185     */
186    @Override
187    public void fire() throws IllegalActionException {
188        super.fire();
189        int width = input.getWidth();
190        if (width == 0) {
191            channel.validate();
192            throw new IllegalActionException(this,
193                    "Subscriber could not find a matching Publisher "
194                            + "with channel \"" + channel.stringValue() + "\"");
195
196        }
197        for (int i = 0; i < width; i++) {
198            if (input.hasToken(i)) {
199                Token token = input.get(i);
200                if (i < output.getWidth()) {
201                    output.send(i, token);
202                }
203            }
204        }
205    }
206
207    /** Return a Set of Publishers that are connected to this Subscriber.
208     *  @return A Set of Publishers that are connected to this Subscriber.
209     *  @exception KernelException If thrown when a Manager is added to
210     *  the top level or if preinitialize() fails.
211     */
212    public Set<AtomicActor> publishers() throws KernelException {
213        return ActorDependencies.prerequisites(this, Publisher.class);
214    }
215
216    ///////////////////////////////////////////////////////////////////
217    ////                         protected methods                 ////
218
219    /** Create an input port. This is a protected method so that
220     *  subclasses can create different input ports. This is called
221     *  in the constructor, so subclasses cannot reliably access
222     *  local variables.
223     *  @exception IllegalActionException If creating the input port fails.
224     *  @exception NameDuplicationException If there is already a port named "input".
225     */
226    protected void _createInputPort()
227            throws IllegalActionException, NameDuplicationException {
228        input = new SubscriberPort(this, "input");
229    }
230}