001/* An output port that publishes its data on a named channel.
002
003 Copyright (c) 1997-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026 */
027package ptolemy.actor;
028
029import java.util.HashSet;
030import java.util.Set;
031
032import ptolemy.actor.util.DFUtilities;
033import ptolemy.data.ArrayToken;
034import ptolemy.data.BooleanToken;
035import ptolemy.data.Token;
036import ptolemy.data.expr.Parameter;
037import ptolemy.data.expr.StringParameter;
038import ptolemy.data.type.BaseType;
039import ptolemy.kernel.ComponentEntity;
040import ptolemy.kernel.InstantiableNamedObj;
041import ptolemy.kernel.util.Attribute;
042import ptolemy.kernel.util.IllegalActionException;
043import ptolemy.kernel.util.InternalErrorException;
044import ptolemy.kernel.util.KernelException;
045import ptolemy.kernel.util.NameDuplicationException;
046import ptolemy.kernel.util.NamedObj;
047
048///////////////////////////////////////////////////////////////////
049//// PublisherPort
050
051/**
052 This is a specialized output port that publishes data sent through it on
053 the specified named channel.  The tokens are
054 "tunneled" to any instance of {@link SubscriberPort} that names the same channel.
055 If {@link #global} is false (the default), then this publisher
056 will only send to instances of SubscriberPort that are under the
057 control of the same director. That is, it can
058 be at a different level of the hierarchy, or in an entirely different
059 composite actor, as long as the relevant composite actors are
060 transparent (have no director). If {@link #global} is true,
061 then the subscriber may be anywhere in the model, as long as its
062 <i>global</i> parameter is also true.
063 <p>
064 It is an error to have two instances of PublisherPort using the same
065 channel under the control of the same director. When you create a
066 new PublisherPort, by default, it has no channel name. You have to
067 specify a channel name to use it.
068 <p>
069 <b>How it works:</b>
070 When the channel name
071 is specified, typically during model construction, this actor
072 causes a relation to be created in the least opaque composite
073 actor above it in the hierarchy and links to that relation.
074 In addition, if {@link #global} is set to true, it causes
075 a port to be created in that composite, and also links that
076 port to the relation on the inside.  The relation is recorded by the opaque
077 composite.  When a SubscriberPort is preinitialized that refers
078 to the same channel, that SubscriberPort finds the relation (by
079 finding the least opaque composite actor above it) and links
080 to the relation. Some of these links are "liberal links" in that
081 they cross levels of the hierarchy.
082 <p>
083 Since publishers are linked to subscribers,
084 any data dependencies that the director might assume on a regular
085 "wired" connection will also be assumed across publisher-subscriber
086 pairs. Similarly, type constraints will propagate across
087 publisher-subscriber pairs. That is, the type of the subscriber
088 output will match the type of the publisher input.
089
090 @author Edward A. Lee
091 @version $Id$
092 @since Ptolemy II 10.0
093 @Pt.ProposedRating Yellow (eal)
094 @Pt.AcceptedRating Red (eal)
095 */
096public class PublisherPort extends PubSubPort {
097
098    /** Construct a publisher port with the specified name and container.
099     *  @param container The container actor.
100     *  @param name The name of the port.
101     *  @exception IllegalActionException If the port is not of an acceptable
102     *   class for the container, or if the container does not implement the
103     *   Actor interface.
104     *  @exception NameDuplicationException If the name coincides with
105     *   a port already in the container.
106     */
107    public PublisherPort(ComponentEntity container, String name)
108            throws IllegalActionException, NameDuplicationException {
109        super(container, name);
110
111        propagateNameChanges = new Parameter(this, "propagateNameChanges");
112        propagateNameChanges.setExpression("false");
113        propagateNameChanges.setTypeEquals(BaseType.BOOLEAN);
114
115        setOutput(true);
116        setInput(false);
117
118        // In order for this to show up in the vergil library, it has to have
119        // an icon description.
120        _attachText("_smallIconDescription", "<svg>\n"
121                + "<polygon points=\"-8,9 -2,4 12,4 12,-4 -2,-4 -8,-9\" "
122                + "style=\"fill:cyan\"/>\n" + "</svg>\n");
123    }
124
125    ///////////////////////////////////////////////////////////////////
126    ////                         parameters                        ////
127
128    /** If true, then propagate channel name changes to any
129     *  Subscribers.  The default value is a BooleanToken with the
130     *  value false, indicating that if the channel name is changed,
131     *  then the channel names of the Subscribers are not changed.  If
132     *  the value is true, then if the channel name is changed, the
133     *  channel names of the connected Subscribers are updated.
134     *
135     *  <p>If the value is true, then SubscriptionAggregators that
136     *  have the same regular expression as the channel name of the
137     *  Publisher will be updated.  However, SubscriptionAggregators
138     *  usually have regular expressions as channel names, so usually
139     *  the channel name of the SubscriptionAggregator will <b>not</b>
140     *  be updated.</p>
141     *
142     *  <p>Note that if a Publisher is within an Actor Oriented Class
143     *  definition, then any Subscribers with the same channel name in
144     *  Actor Oriented Class definitions will <b>not</b> be updated.
145     *  This is because there is no connection between the Publisher
146     *  in the Actor Oriented Class definition and the Subscriber.
147     *  However, if the channel name in a Publisher in an instance of
148     *  an Actor Oriented Class is updated, then the
149     *  corresponding Subscribers in instances of Actor Oriented Class
150     *  will be updated.</p>
151     */
152    public Parameter propagateNameChanges;
153
154    ///////////////////////////////////////////////////////////////////
155    ////                         public methods                    ////
156
157    /** Throw an exception.
158     * Adding initializables to the container is not supported.
159     */
160    @Override
161    public void addInitializable(Initializable initializable) {
162        throw new InternalErrorException(
163                "Cannot add Initializables to PublisherPort.");
164    }
165
166    /** If a publish and subscribe channel is set, then set up the connections.
167     *  @param attribute The attribute that changed.
168     *  @exception IllegalActionException Thrown if the new color attribute cannot
169     *      be created.
170     */
171    @Override
172    public void attributeChanged(Attribute attribute)
173            throws IllegalActionException {
174        if (attribute == channel || attribute == global) {
175            // We only get the value if we are not in a class definition.
176            // Class definitions need not have connections.
177            // The connections will be made in the instances.
178            // The reason is that some of the Actor Oriented Classes
179            // that use Publishers do not have the parameter defined
180            // in the definition.  See
181            // ptolemy/actor/lib/test/auto/PublisherClassNoParameter.xml
182            NamedObj immediateContainer = getContainer();
183            if (immediateContainer != null) {
184                NamedObj container = immediateContainer.getContainer();
185                // NOTE: During cloning, the container reports that it is in a class definition!
186                // Hence, this PublisherPort has to do the registering when clone is
187                // set to no longer be a class definition.
188                if (container instanceof InstantiableNamedObj
189                        && !((InstantiableNamedObj) container)
190                                .isWithinClassDefinition()
191                //                        || (container == null
192                //                                && immediateContainer instanceof InstantiableNamedObj
193                //                                && !((InstantiableNamedObj)immediateContainer).isWithinClassDefinition())
194                ) {
195                    String newValue = channel.stringValue();
196                    boolean globalValue = ((BooleanToken) global.getToken())
197                            .booleanValue();
198                    if (!newValue.equals(_channel) || globalValue != _global) {
199                        //                        if (container == null
200                        //                                && immediateContainer instanceof InstantiableNamedObj
201                        //                                && !((InstantiableNamedObj)immediateContainer).isWithinClassDefinition()) {
202                        //                            // Port is in the toplevel.
203                        //                            container = immediateContainer;
204                        //                        }
205                        if (container instanceof CompositeActor) {
206                            // The vergil and config tests were failing because
207                            // moml.EntityLibrary sometimes contains Subscribers.
208                            try {
209                                if (attribute == global) {
210                                    if (_global && !globalValue) {
211                                        // Changing from global to non-global.
212                                        ((CompositeActor) container)
213                                                .unregisterPublisherPort(
214                                                        _channel, this, true);
215                                    }
216                                }
217
218                                if (attribute == channel && !(_channel == null
219                                        || _channel.trim().equals(""))) {
220                                    // Changing the channel from a previous channel name.
221                                    if (((BooleanToken) propagateNameChanges
222                                            .getToken()).booleanValue()) {
223                                        try {
224                                            _updateChannelNameOfConnectedSubscribers(
225                                                    _channel, newValue);
226                                        } catch (KernelException ex) {
227                                            throw new IllegalActionException(
228                                                    this, ex,
229                                                    "Failed to set channel to "
230                                                            + newValue);
231                                        }
232                                    }
233                                }
234
235                                if (attribute == channel && !(_channel == null
236                                        || _channel.trim().equals(""))) {
237                                    ((CompositeActor) container)
238                                            .unregisterPublisherPort(_channel,
239                                                    this, _global);
240                                }
241                                ((CompositeActor) container)
242                                        .registerPublisherPort(newValue, this,
243                                                globalValue);
244
245                            } catch (NameDuplicationException e) {
246                                throw new IllegalActionException(this, e,
247                                        "Can't add published port.");
248                            }
249                        }
250                        _channel = newValue;
251                        _global = globalValue;
252                    }
253                }
254            }
255        } else if (attribute == initialTokens) {
256            // Set the production rate parameter for the benefit of SDF.
257            // If this port is not opaque, SDF will not see it, so the
258            // corresponding SubscriberPorts become responsible for
259            // setting their tokenInitConsumption parameters.
260            Token initialOutputsValue = initialTokens.getToken();
261            if (initialOutputsValue != null) {
262                if (!(initialOutputsValue instanceof ArrayToken)) {
263                    throw new IllegalActionException(this,
264                            "initialOutputs value is required to be an array.");
265                }
266                int length = ((ArrayToken) initialOutputsValue).length();
267                DFUtilities.setOrCreate(this, "tokenInitProduction", length);
268            }
269        } else {
270            super.attributeChanged(attribute);
271        }
272    }
273
274    /** Notify this object that the containment hierarchy above it has
275     *  changed. This registers the port as a publisher with the
276     *  container of the container, if there is one.
277     *  @exception IllegalActionException If the change is not
278     *   acceptable.
279     */
280    @Override
281    public void hierarchyChanged() throws IllegalActionException {
282        // NOTE: It is not OK to access the cached variable _channel
283        // here instead of the channel parameter because the parameter
284        // may not have been validating (during instantiation of
285        // actor-oriented classes).
286        String channelValue = null;
287        try {
288            // The channel may refer to parameters via $
289            // but the parameters are not yet in scope.
290            channelValue = channel.stringValue();
291        } catch (Throwable throwable) {
292            // Ignore this on the assumption that we will
293            // get another chance when the channel is set.
294        }
295        if (channelValue != null && !channelValue.equals("")) {
296            NamedObj immediateContainer = getContainer();
297            if (immediateContainer != null) {
298                NamedObj container = immediateContainer.getContainer();
299                if (container instanceof CompositeActor) {
300                    try {
301                        ((CompositeActor) container).registerPublisherPort(
302                                channelValue, this, _global);
303                        // Need to make sure to record the channel name
304                        // so that it gets unregistered if it later changes.
305                        // In particular, _channel may be null if attributeChanged()
306                        // has not been called.
307                        // NOTE: Should we check for some value of _channel other
308                        // than null?  There shouldn't be any that doesn't match
309                        // the channelValue.
310                        _channel = channelValue;
311                    } catch (NameDuplicationException e) {
312                        throw new InternalErrorException(e);
313                    }
314                }
315            }
316        }
317        super.hierarchyChanged();
318    }
319
320    /** Notify this object that the containment hierarchy above it will be
321     *  changed, which results in publisher ports being unregistered.
322     *  @exception IllegalActionException If unlinking to a published port fails.
323     */
324    @Override
325    public void hierarchyWillChange() throws IllegalActionException {
326        if (channel != null) {
327            String channelValue = null;
328            try {
329                // The channel may refer to parameters via $
330                // but the parameters are not yet in scope.
331                channelValue = channel.stringValue();
332            } catch (Throwable throwable) {
333                channelValue = channel.getExpression();
334            }
335            if (channelValue != null) {
336                NamedObj immediateContainer = getContainer();
337                if (immediateContainer != null) {
338                    NamedObj container = immediateContainer.getContainer();
339                    if (container instanceof CompositeActor) {
340                        try {
341                            ((CompositeActor) container)
342                                    .unregisterPublisherPort(channelValue, this,
343                                            _global);
344                        } catch (NameDuplicationException e) {
345                            throw new InternalErrorException(e);
346                        }
347                    }
348                }
349            }
350        }
351        super.hierarchyWillChange();
352    }
353
354    /** If {@link #initialTokens} has been set, then produce the
355     *  outputs specified by its array value.
356     *  @exception IllegalActionException If initialTokens is invalid.
357     */
358    @Override
359    public void initialize() throws IllegalActionException {
360        if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) {
361            // Don't initialize Class Definitions.
362            // See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml
363            return;
364        }
365
366        Token initialOutputsValue = initialTokens.getToken();
367        if (initialOutputsValue instanceof ArrayToken) {
368            // If this port has inside receivers, then it is an opaque port
369            // for a composite actor, and the right way to send outputs is
370            // to populate the inside receivers.
371            Receiver[][] receivers = getInsideReceivers();
372            if (receivers != null && receivers.length > 0
373                    && receivers[0].length > 0) {
374                for (Receiver[] receiver : receivers) {
375                    for (int j = 0; j < receivers.length; j++) {
376                        for (Token token : ((ArrayToken) initialOutputsValue)
377                                .arrayValue()) {
378                            receiver[j].put(token);
379                        }
380                    }
381                }
382            } else {
383                // If this port is transparent or is contained by an atomic actor, then
384                // send initial tokens directly from it. It is not correct to send
385                // them from the source ports connected on the inside because those
386                // ports may also have other destinations.
387                for (Token token : ((ArrayToken) initialOutputsValue)
388                        .arrayValue()) {
389                    broadcast(token);
390                }
391            }
392        }
393    }
394
395    /** Override the base class to throw an exception if this port is at the top level.
396     *  @exception IllegalActionException If the port is in
397     *   the top level, or if the superclass throws it.
398     */
399    @Override
400    public void preinitialize() throws IllegalActionException {
401        NamedObj actor = getContainer();
402        if (actor != null && actor.getContainer() == null) {
403            throw new IllegalActionException(this,
404                    "PublisherPorts cannot be used at the top level, use a Publisher actor instead.");
405        }
406        super.preinitialize();
407    }
408
409    /** Override the base class to refuse to accept setting to be an input.
410     *  @param isInput Required to be false.
411     *  @exception IllegalActionException If the argument is true.
412     */
413    @Override
414    public void setInput(boolean isInput) throws IllegalActionException {
415        if (isInput) {
416            throw new IllegalActionException(this,
417                    "PublisherPort cannot be an input port.");
418        }
419        super.setInput(false);
420    }
421
422    /** Override the base class to require the port to be an output.
423     *  @param isOutput Required to be true.
424     *  @exception IllegalActionException If the argument is false.
425     */
426    @Override
427    public void setOutput(boolean isOutput) throws IllegalActionException {
428        if (!isOutput) {
429            throw new IllegalActionException(this,
430                    "PublisherPort is required to be an output port.");
431        }
432        super.setOutput(true);
433    }
434
435    /** Return a Set of SubscriberPort that are connected to this Publisher.
436     *  @return A Set of Subscribers that are connected to this Publisher
437     *  @exception KernelException If thrown when a Manager is added to
438     *  the top level or if preinitialize() fails.
439     */
440    public Set<SubscriberPort> subscribers() throws KernelException {
441        // preinitialize() creates connections between Publishers and
442        // Subscribers.
443        Manager.preinitializeThenWrapup((Actor) getContainer());
444        return _dependents(this);
445    }
446
447    ///////////////////////////////////////////////////////////////////
448    ////                         private methods                   ////
449
450    /** If the specified port is an instance of
451     *  {@link SubscriberPort}, then return a set containing it;
452     *  otherwise, return a set of SubscriberPort instances that downstream
453     *  of the specified port that subscribe to this publisher.
454     *  This method traverses opaque composites.
455     *  @param port The port to be checked
456     *  @return The Set of all AtomicActors connected to the port.
457     */
458    private Set<SubscriberPort> _dependents(IOPort port)
459            throws IllegalActionException {
460        //System.out.println("ActorDependencies._dependents: START" + remotePort.getFullName());
461        Set<SubscriberPort> results = new HashSet<SubscriberPort>();
462        if (port instanceof SubscriberPort) {
463            results.add((SubscriberPort) port);
464        } else {
465            if (port.isOutput() && port.isInput()) {
466                throw new IllegalActionException(port,
467                        "Can't handle port that is both input and output.");
468            }
469            Receiver[][] receivers = null;
470            if (port.isOutput()) {
471                receivers = port.getRemoteReceivers();
472            } else if (port.isInput()) {
473                receivers = port.deepGetReceivers();
474            } else {
475                throw new IllegalActionException(port,
476                        "Can't handle port that is neither input nor output.");
477            }
478            if (receivers != null) {
479                for (Receiver[] receiver : receivers) {
480                    if (receiver != null) {
481                        for (int j = 0; j < receiver.length; j++) {
482                            if (receiver[j] != null) {
483                                IOPort remotePort2 = receiver[j].getContainer();
484                                if (remotePort2 != null) {
485                                    results.addAll(_dependents(remotePort2));
486                                }
487                            }
488                        }
489                    }
490                }
491            }
492        }
493        return results;
494    }
495
496    /** Update the channel name of any connected SubscriberPorts.
497     *  Note that the channel name of connected SubscriptionAggregatorPorts
498     *  are not updated.
499     *  @param previousChannelName The previous name of the channel.
500     *  @param newChannelName The new name of the channel.
501     *  @exception KernelException If thrown when a Manager is added to
502     *  the top level, or when the channel name of a Subscriber is changed.
503     */
504    private void _updateChannelNameOfConnectedSubscribers(
505            String previousChannelName, String newChannelName)
506            throws KernelException {
507
508        // This will end up calling attributeChanged() (in order to
509        // defeat lazy composites), and we want to prevent a recursive
510        // call here.
511        if (_inUpdateCall) {
512            return;
513        }
514        _inUpdateCall = true;
515        try {
516            // We use subscribers() here so that we get any subscribers in
517            // Opaque TypedCompositeActors.
518            for (SubscriberPort port : subscribers()) {
519                if (port.channel.stringValue().equals(previousChannelName)) {
520                    // Avoid updating SubscriptionAggregators that have regular
521                    // expressions that are different than the Publisher channel name.
522
523                    // Handle the case where the channel name is an expression
524                    // that evaluates to the value of the enclosing Publisher actor's
525                    // channel parameter. In that case, we want to change the value
526                    // the channel in the Publisher actor, rather than here.
527                    if (port.channel.getExpression().equals("$channel")) {
528                        NamedObj container = port.getContainer();
529                        Attribute containerChannel = container
530                                .getAttribute("channel");
531                        if (containerChannel instanceof StringParameter) {
532                            ((StringParameter) containerChannel)
533                                    .setExpression(newChannelName);
534                            container.attributeChanged(containerChannel);
535                            port.attributeChanged(port.channel);
536                        } else {
537                            port.channel.setExpression(newChannelName);
538                            port.attributeChanged(port.channel);
539                        }
540                    } else {
541                        port.channel.setExpression(newChannelName);
542                        port.attributeChanged(port.channel);
543                    }
544                }
545            }
546        } finally {
547            _inUpdateCall = false;
548        }
549    }
550
551    ///////////////////////////////////////////////////////////////////
552    ////                         private variables                 ////
553
554    /** Flag to prevent recursive call of subscribers() method. */
555    private boolean _inUpdateCall = false;
556}