001/* An output port that publishes its data on a named channel.
002
003 Copyright (c) 1997-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026 */
027package ptolemy.actor;
028
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.LinkedList;
032import java.util.List;
033import java.util.Map;
034
035import ptolemy.actor.util.DFUtilities;
036import ptolemy.data.ArrayToken;
037import ptolemy.data.BooleanToken;
038import ptolemy.data.IntToken;
039import ptolemy.data.Token;
040import ptolemy.data.expr.StringParameter;
041import ptolemy.data.expr.Variable;
042import ptolemy.kernel.ComponentEntity;
043import ptolemy.kernel.CompositeEntity;
044import ptolemy.kernel.Entity;
045import ptolemy.kernel.InstantiableNamedObj;
046import ptolemy.kernel.Port;
047import ptolemy.kernel.util.Attribute;
048import ptolemy.kernel.util.IllegalActionException;
049import ptolemy.kernel.util.InternalErrorException;
050import ptolemy.kernel.util.NameDuplicationException;
051import ptolemy.kernel.util.NamedObj;
052
053///////////////////////////////////////////////////////////////////
054//// SubscriberPort
055
056/**
057 This is a specialized input port that subscribes to data sent
058 to it on the specified named channel.
059 The tokens are "tunneled" from an instance of
060 {@link PublisherPort} that names the same channel.
061 If {@link #global} is false (the default), then this subscriber
062 will only see instances of PublisherPort that are under the
063 control of the same director. That is, it can
064 be at a different level of the hierarchy, or in an entirely different
065 composite actor, as long as the relevant composite actors are
066 transparent (have no director). If {@link #global} is true,
067 then the publisher may be anywhere in the model, as long as its
068 <i>global</i> parameter is also true.
069 <p>
070 Any number of instances of SubscriberPort can subscribe to the same
071 channel.
072 <p>
073 This actor actually has a hidden input port that is connected
074 to the publisher via hidden "liberal links" (links that are
075 allowed to cross levels of the hierarchy).  Consequently,
076 any data dependencies that the director might assume on a regular
077 "wired" connection will also be assumed across Publisher-Subscriber
078 pairs.  Similarly, type constraints will propagate across
079 Publisher-Subscriber pairs. That is, the type of the Subscriber
080 output will match the type of the Publisher input.
081
082 @author Edward A. Lee, Contributor: Christopher Brooks
083 @version $Id$
084 @since Ptolemy II 10.0
085 @Pt.ProposedRating Yellow (eal)
086 @Pt.AcceptedRating Red (eal)
087 */
088public class SubscriberPort extends PubSubPort {
089
090    /** Construct a subscriber port with a containing actor and a name.
091     *  This is always an input port.
092     *  @param container The container actor.
093     *  @param name The name of the port.
094     *  @exception IllegalActionException If the port is not of an acceptable
095     *   class for the container, or if the container does not implement the
096     *   Actor interface.
097     *  @exception NameDuplicationException If the name coincides with
098     *   a port already in the container.
099     */
100    public SubscriberPort(ComponentEntity container, String name)
101            throws IllegalActionException, NameDuplicationException {
102        super(container, name);
103
104        setOutput(false);
105        setInput(true);
106
107        // In order for this to show up in the vergil library, it has to have
108        // an icon description.
109        _attachText("_smallIconDescription", "<svg>\n"
110                + "<polygon points=\"0,4 0,9 12,0 0,-9 0,-4 -8,-4 -8,4\" "
111                + "style=\"fill:cyan\"/>\n" + "</svg>\n");
112    }
113
114    ///////////////////////////////////////////////////////////////////
115    ////                         public methods                    ////
116
117    /** If a publish and subscribe channel is set, then set up the connections.
118     *  @param attribute The attribute that changed.
119     *  @exception IllegalActionException Thrown if the new color attribute cannot
120     *      be created.
121     */
122    @Override
123    public void attributeChanged(Attribute attribute)
124            throws IllegalActionException {
125        if (attribute == channel) {
126            String newValue = channel.stringValue();
127            if (!newValue.equals(_channel)) {
128                NamedObj immediateContainer = getContainer();
129                if (immediateContainer != null) {
130                    NamedObj container = immediateContainer.getContainer();
131                    if (container instanceof CompositeActor
132                            && !(_channel == null
133                                    || _channel.trim().equals(""))) {
134                        ((CompositeActor) container)
135                                .unlinkToPublishedPort(_channel, this, _global);
136                    }
137                }
138                _channel = newValue;
139            }
140        } else if (attribute == global) {
141            boolean newValue = ((BooleanToken) global.getToken())
142                    .booleanValue();
143            if (newValue == false && _global == true) {
144                NamedObj immediateContainer = getContainer();
145                if (immediateContainer != null) {
146                    NamedObj container = immediateContainer.getContainer();
147                    if (container instanceof CompositeActor
148                            && !(_channel == null
149                                    || _channel.trim().equals(""))) {
150                        ((CompositeActor) container)
151                                .unlinkToPublishedPort(_channel, this, _global);
152                    }
153                }
154            }
155            _global = newValue;
156            // Do not call SubscriptionAggregator.attributeChanged()
157            // because it will remove the published port name by _channel.
158            // If _channel is set to a real name (not a regex pattern),
159            // Then chaos ensues.  See test 3.0 in SubscriptionAggregator.tcl
160        } else if (attribute == initialTokens) {
161            // Set the initial token parameter for the benefit of SDF.
162            // If this port is not opaque, SDF will not see it, so we
163            // will need in preinitialize() to set the init production
164            // of the inside ports.
165            Token initialOutputsValue = initialTokens.getToken();
166            if (initialOutputsValue != null) {
167                if (!(initialOutputsValue instanceof ArrayToken)) {
168                    throw new IllegalActionException(this,
169                            "initialOutputs value is required to be an array.");
170                }
171                int length = ((ArrayToken) initialOutputsValue).length();
172                DFUtilities.setOrCreate(this, "tokenInitProduction", length);
173            }
174        } else {
175            super.attributeChanged(attribute);
176        }
177    }
178
179    /** Notify this object that the containment hierarchy above it has
180     *  changed. This restores the tokenInitConsumption parameters of
181     *  any ports that had that parameter changed in a previous
182     *  call to preinitialize().
183     *  @exception IllegalActionException If the change is not
184     *   acceptable.
185     */
186    @Override
187    public void hierarchyChanged() throws IllegalActionException {
188        // If we have previously set the tokenInitConsumption variable
189        // of some port, restore it now to its original value.
190        if (_tokenInitConsumptionSet != null) {
191            for (IOPort port : _tokenInitConsumptionSet.keySet()) {
192                String previousValue = _tokenInitConsumptionSet.get(port);
193                Variable variable = DFUtilities.getRateVariable(port,
194                        "tokenInitConsumption");
195                if (previousValue == null) {
196                    try {
197                        variable.setContainer(null);
198                    } catch (NameDuplicationException e) {
199                        // Should not occur.
200                        throw new InternalErrorException(e);
201                    }
202                } else {
203                    variable.setExpression(previousValue);
204                }
205            }
206        }
207        super.hierarchyChanged();
208    }
209
210    /** Notify this object that the containment hierarchy above it will be
211     *  changed, which results in the channel being unlinked from the publisher.
212     *  @exception IllegalActionException If unlinking to a published port fails.
213     */
214    @Override
215    public void hierarchyWillChange() throws IllegalActionException {
216        if (channel != null) {
217            String channelValue = null;
218            try {
219                // The channel may refer to parameters via $
220                // but the parameters are not yet in scope.
221                channelValue = channel.stringValue();
222            } catch (Throwable throwable) {
223                channelValue = channel.getExpression();
224            }
225            if (channelValue != null) {
226                NamedObj immediateContainer = getContainer();
227                if (immediateContainer != null) {
228                    NamedObj container = immediateContainer.getContainer();
229                    if (container instanceof CompositeActor) {
230                        ((CompositeActor) container)
231                                .unlinkToPublishedPort(channelValue, this);
232                    }
233                }
234            }
235        }
236        super.hierarchyWillChange();
237    }
238
239    /** If {@link #initialTokens} has been set, then make available the
240     *  inputs specified by its array value.
241     */
242    @Override
243    public void initialize() throws IllegalActionException {
244        if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) {
245            // Don't initialize Class Definitions.
246            // FIXME: Probably shouldn't even be a registered Initializable.
247            // See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml
248            return;
249        }
250        // If the publisher port is not opaque and is an instance of
251        // ConstantPublisherPort, then we have some work to do. If
252        // this port is opaque, we set it to return a constant value
253        // provided by the ConstantPublisherPort. If not, then we have
254        // set the inside destination ports to return constant values.
255        if (_publisherPort instanceof ConstantPublisherPort) {
256            Token constantToken = ((ConstantPublisherPort) _publisherPort).constantValue
257                    .getToken();
258            Token limitToken = ((ConstantPublisherPort) _publisherPort).numberOfTokens
259                    .getToken();
260            int limit = ((IntToken) limitToken).intValue();
261            if (isOpaque()) {
262                _setConstant(constantToken, limit);
263            } else {
264                // NOTE: insideSinkPortList() doesn't work here if the
265                // port is transparent. The returned list is empty,
266                // unfortunately, so we have duplicate that functionality
267                // here.
268                Director dir = ((Actor) getContainer()).getDirector();
269                int depthOfDirector = dir.depthInHierarchy();
270                LinkedList<IOPort> insidePorts = new LinkedList<IOPort>();
271                Iterator<?> ports = deepInsidePortList().iterator();
272
273                while (ports.hasNext()) {
274                    IOPort port = (IOPort) ports.next();
275                    int depth = port.getContainer().depthInHierarchy();
276
277                    if (port.isInput() && depth >= depthOfDirector) {
278                        insidePorts.addLast(port);
279                    } else if (port.isOutput() && depth < depthOfDirector) {
280                        insidePorts.addLast(port);
281                    }
282                }
283                for (IOPort insidePort : insidePorts) {
284                    insidePort._setConstant(constantToken, limit);
285                }
286            }
287        }
288
289        Token initialOutputsValue = initialTokens.getToken();
290        if (initialOutputsValue instanceof ArrayToken) {
291            // If this port is opaque, put the tokens into the receivers.
292            if (isOpaque()) {
293                Receiver[][] receivers = getReceivers();
294                if (receivers != null) {
295                    for (Receiver[] receiver : receivers) {
296                        for (int j = 0; j < receivers.length; j++) {
297                            for (Token token : ((ArrayToken) initialOutputsValue)
298                                    .arrayValue()) {
299                                receiver[j].put(token);
300                            }
301                        }
302                    }
303                }
304            } else {
305                // The port is not opaque.
306                for (Token token : ((ArrayToken) initialOutputsValue)
307                        .arrayValue()) {
308                    for (int i = 0; i < getWidth(); i++) {
309                        sendInside(i, token);
310                    }
311                }
312            }
313        }
314    }
315
316    /** Override the base class to ensure that there is a publisher.
317     *  @exception IllegalActionException If there is no matching
318     *   publisher, if the channel is not specified or if the port
319     *   is in the top level.
320     */
321    @Override
322    public void preinitialize() throws IllegalActionException {
323        if (_channel == null) {
324            throw new IllegalActionException(this, "No channel specified.");
325        }
326        NamedObj actor = getContainer();
327        if (actor != null && actor.getContainer() == null) {
328            throw new IllegalActionException(this,
329                    "SubscriberPorts cannot be used at the top level, use a Subscriber actor instead.");
330        }
331        if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) {
332            // Don't preinitialize Class Definitions.
333            // See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml
334            return;
335        }
336        _updateLinks();
337    }
338
339    /** Override the base class to only accept setting to be an input.
340     *  @param isInput True to make the port an input.
341     *  @exception IllegalActionException If the argument is false.
342     */
343    @Override
344    public void setInput(boolean isInput) throws IllegalActionException {
345        if (!isInput) {
346            throw new IllegalActionException(this,
347                    "SubscriberPort is required to be an input port.");
348        }
349        super.setInput(true);
350    }
351
352    /** Override the base class to refuse to make the port an output.
353     *  @param isOutput Required to be false.
354     *  @exception IllegalActionException If the argument is true.
355     */
356    @Override
357    public void setOutput(boolean isOutput) throws IllegalActionException {
358        if (isOutput) {
359            throw new IllegalActionException(this,
360                    "SubscriberPort cannot be an output port.");
361        }
362        super.setOutput(false);
363    }
364
365    ///////////////////////////////////////////////////////////////////
366    ////                         protected methods                 ////
367
368    /** Update the connection to the publisher, if there is one.
369     *  Note that this method is computationally intensive for large
370     *  models as it traverses the model by searching
371     *  up the hierarchy for the nearest opaque container
372     *  or the top level and then traverses the contained entities.
373     *  Thus, avoid calling this method except when the model
374     *  is running.
375     *  @exception IllegalActionException If creating the link
376     *   triggers an exception.
377     */
378    protected void _updateLinks() throws IllegalActionException {
379        // If the channel has not been set, then there is nothing
380        // to do.  This is probably the first setContainer() call,
381        // before the object is fully constructed.
382        if (_channel == null) {
383            return;
384        }
385
386        NamedObj immediateContainer = getContainer();
387        if (immediateContainer != null) {
388            NamedObj container = immediateContainer.getContainer();
389            if (container instanceof CompositeActor) {
390                try {
391                    IOPort publisherPort = null;
392                    try {
393                        publisherPort = ((CompositeActor) container)
394                                .linkToPublishedPort(_channel, this, _global);
395                    } catch (IllegalActionException ex) {
396                        // If we have a LazyTypedCompositeActor that
397                        // contains the Publisher, then populate() the
398                        // model, expanding the LazyTypedCompositeActors
399                        // and retry the link.  This is computationally
400                        // expensive.
401                        // See $PTII/ptolemy/actor/lib/test/auto/LazyPubSub.xml
402                        _updatePublisherPorts((CompositeEntity) toplevel());
403                        // Now try again.
404                        try {
405                            publisherPort = ((CompositeActor) container)
406                                    .linkToPublishedPort(_channel, this,
407                                            _global);
408                        } catch (IllegalActionException ex2) {
409                            // Rethrow with the "this" so that Go To Actor works.
410                            throw new IllegalActionException(this, ex2,
411                                    "Failed to update link.");
412                        }
413                    }
414                    // Set the init consumption parameter for this port, or if this
415                    // port is not opaque, for the opaque ports connected to it on the inside.
416                    // The init consumption will be the sum of the number of initial
417                    // tokens this port has and the number of initial tokens produced
418                    // by the publisher port if it is not opaque (if it is opaque, then
419                    // its token init production parameter will be seen by the scheduler).
420                    int length = 0;
421
422                    Token initialOutputsValue = initialTokens.getToken();
423                    if (initialOutputsValue != null) {
424                        length = ((ArrayToken) initialOutputsValue).length();
425                    }
426
427                    // If the publisherPort has initial production and is not opaque,
428                    // then for the benefit of SDF we need to set the tokenInitConsumption
429                    // parameter here so that the SDF scheduler knows that initial tokens
430                    // will be available.
431                    if (!publisherPort.isOpaque()) {
432                        length += DFUtilities
433                                .getTokenInitProduction(publisherPort);
434                    }
435                    _publisherPort = publisherPort;
436
437                    if (length > 0) {
438                        if (isOpaque()) {
439                            DFUtilities.setOrCreate(this,
440                                    "tokenInitConsumption", length);
441                        } else {
442                            // If this port is not opaque, then we have
443                            // to set the parameter for inside ports that will
444                            // actually receive the initial token.
445                            if (_tokenInitConsumptionSet == null) {
446                                _tokenInitConsumptionSet = new HashMap<IOPort, String>();
447                            }
448                            List<IOPort> insidePorts = deepInsidePortList();
449                            for (IOPort port : insidePorts) {
450                                Variable previousVariable = DFUtilities
451                                        .getRateVariable(port,
452                                                "tokenInitConsumption");
453                                if (previousVariable == null) {
454                                    _tokenInitConsumptionSet.put(port, null);
455                                } else {
456                                    String previousValue = previousVariable
457                                            .getExpression();
458                                    _tokenInitConsumptionSet.put(port,
459                                            previousValue);
460                                }
461                                DFUtilities.setOrCreate(port,
462                                        "tokenInitConsumption", length);
463                            }
464                        }
465                    }
466                } catch (NameDuplicationException e) {
467                    throw new IllegalActionException(this, e,
468                            "Can't link SubscriptionAggregatorPort with a PublisherPort.");
469                }
470            }
471        }
472    }
473
474    /** Traverse the model, starting at the specified object
475     *  and examining objects below it in the hierarchy, to find
476     *  all instances of PublisherPort and make sure that they have
477     *  registered their port. This method defeats lazy composites
478     *  and is expensive to execute.
479     *  @param root The root of the tree to search.
480     *  @exception IllegalActionException If the port rejects its channel.
481     */
482    protected void _updatePublisherPorts(Entity root)
483            throws IllegalActionException {
484        List<Port> ports = root.portList();
485        for (Port port : ports) {
486            if (port instanceof PublisherPort) {
487                // FIXME: Not sure if this is necessary
488                StringParameter channel = ((PublisherPort) port).channel;
489                channel.validate();
490                port.attributeChanged(channel);
491            }
492        }
493        if (root instanceof CompositeEntity) {
494            List<Entity> entities = ((CompositeEntity) root).entityList();
495            for (Entity entity : entities) {
496                _updatePublisherPorts(entity);
497            }
498        }
499    }
500
501    ///////////////////////////////////////////////////////////////////
502    ////                         private variables                 ////
503
504    /** The associated publisherPort, found during preinitialize(). */
505    private IOPort _publisherPort;
506
507    /** Set of ports whose tokenInitConsumption variable has been set
508     *  in preinitialize to something other than 0. This is needed so
509     *  that these variables can be unset if the hierarchy changes.
510     */
511    private Map<IOPort, String> _tokenInitConsumptionSet;
512}