001/* An output port that publishes its data on a named channel.
002
003 Copyright (c) 1997-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 Review vectorized methods.
028 Review broadcast/get/send/hasRoom/hasToken.
029 Review setInput/setOutput/setMultiport.
030 Review isKnown/broadcastClear/sendClear.
031 createReceivers creates inside receivers based solely on insideWidth, and
032 outsideReceivers based solely on outside width.
033 connectionsChanged: no longer validates the attributes of this port.  This is
034 now done in Manager.initialize().
035 Review sendInside, getInside, getWidthInside, transferInputs/Outputs, etc.
036 */
037package ptolemy.actor;
038
039import java.util.regex.Pattern;
040
041import ptolemy.data.BooleanToken;
042import ptolemy.data.Token;
043import ptolemy.data.expr.StringParameter;
044import ptolemy.kernel.ComponentEntity;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.Attribute;
047import ptolemy.kernel.util.IllegalActionException;
048import ptolemy.kernel.util.NameDuplicationException;
049import ptolemy.kernel.util.NamedObj;
050
051///////////////////////////////////////////////////////////////////
052//// SubscriptionAggregatorPort
053
054/**
055 Aggregate data produced by multiple publishers.
056
057 <p>This is a generalization of the {@link
058 ptolemy.actor.SubscriberPort} (the base class) where the channel name
059 is interpreted as a regular expression.  Data produced by all
060 publishers that publish on a channel name that matches the regular
061 expression are aggregated using the operation given by the {@link
062 #operation} parameter.</p>
063
064 <p>Note that the {@link ptolemy.actor.SubscriberPort#channel <i>channel</i>}
065 parameter of the superclass is now a regular expression in this class.
066 One thing to watch out for is using <code>.</code> instead of <code>\.</code>.
067 For example, <code>channel.foo</code> does not mean the same thing as
068 <code>channel\.foo</code>. The latter requires a dot between channel and
069 foo, where the former does not.
070
071 <p>Note that although this is a multiport, calls to get() should only reference
072 channel 0. An exception will be thrown otherwise. The result of the get will
073 be the aggregate of what is received on all the input channels.
074
075 @author Edward A. Lee
076 @version $Id$
077 @since Ptolemy II 10.0
078 @Pt.ProposedRating Yellow (eal)
079 @Pt.AcceptedRating Red (eal)
080 */
081public class SubscriptionAggregatorPort extends SubscriberPort {
082
083    /** Construct a subscriber port with a containing actor and a name.
084     *  This is always an input port.
085     *  @param container The container actor.
086     *  @param name The name of the port.
087     *  @exception IllegalActionException If the port is not of an acceptable
088     *   class for the container, or if the container does not implement the
089     *   Actor interface.
090     *  @exception NameDuplicationException If the name coincides with
091     *   a port already in the container.
092     */
093    public SubscriptionAggregatorPort(ComponentEntity container, String name)
094            throws IllegalActionException, NameDuplicationException {
095        super(container, name);
096
097        operation = new StringParameter(this, "operation");
098        operation.addChoice("add");
099        operation.addChoice("multiply");
100        operation.setExpression("add");
101
102        setMultiport(true);
103    }
104
105    ///////////////////////////////////////////////////////////////////
106    ////                         parameters                        ////
107
108    /** The operation used to aggregate the data produced by
109     *  matching publishers. The choices are "add" and "multiply".
110     *  Note that "multiply" is a poor choice if the data type
111     *  has a non-commutative multiplication operation (e.g.
112     *  matrix types) because the result will be nondeterministic.
113     *  This is a string that defaults to "add".
114     */
115    public StringParameter operation;
116
117    ///////////////////////////////////////////////////////////////////
118    ////                         public methods                    ////
119
120    /** If a publish and subscribe channel is set, then set up the connections.
121     *  If an aspect is added, removed or modified update the list of
122     *  aspects.
123     *  @param attribute The attribute that changed.
124     *  @exception IllegalActionException Thrown if the new color attribute cannot
125     *      be created.
126     */
127    @Override
128    public void attributeChanged(Attribute attribute)
129            throws IllegalActionException {
130        if (attribute == operation) {
131            String newValue = operation.stringValue();
132            if (newValue.equals("add")) {
133                _addOperation = true;
134            } else {
135                _addOperation = false;
136            }
137        } else if (attribute == channel) {
138            // Override the base class to use the version
139            // of unlinkToPublishedPort() that takes a Pattern
140            // argument rather than a String.
141            String newValue = channel.stringValue();
142            if (!newValue.equals(_channel)) {
143                NamedObj immediateContainer = getContainer();
144                if (immediateContainer != null) {
145                    NamedObj container = immediateContainer.getContainer();
146                    if (container instanceof CompositeActor
147                            && !(_channel == null
148                                    || _channel.trim().equals(""))) {
149                        ((CompositeActor) container).unlinkToPublishedPort(
150                                _channelPattern, this, _global);
151                    }
152                    _channel = newValue;
153                    // Don't call super here because super.attributeChanged() tries to unlink _channel
154                    // as a non-regular expression string, which seems wrong.
155                    // super.attributeChanged(attribute);
156                    _channelPattern = Pattern.compile(_channel);
157                }
158            }
159        } else if (attribute == global) {
160            boolean newValue = ((BooleanToken) global.getToken())
161                    .booleanValue();
162            if (newValue == false && _global == true) {
163                NamedObj immediateContainer = getContainer();
164                if (immediateContainer != null) {
165                    NamedObj container = immediateContainer.getContainer();
166                    if (container instanceof CompositeActor
167                            && !(_channel == null
168                                    || _channel.trim().equals(""))) {
169                        ((CompositeActor) container).unlinkToPublishedPort(
170                                _channelPattern, this, _global);
171                    }
172                }
173            }
174            _global = newValue;
175            // Do not call SubscriptionAggregator.attributeChanged()
176            // because it will remove the published port name by _channel.
177            // If _channel is set to a real name (not a regex pattern),
178            // Then chaos ensues.  See test 3.0 in SubscriptionAggregator.tcl
179        } else {
180            super.attributeChanged(attribute);
181        }
182    }
183
184    /** Get a token from the specified channel.
185     *  This overrides the base class to first ensure that
186     *  the <i>channelIndex</i> is 0 (or an exception is
187     *  thrown), and then to aggregate the tokens from all
188     *  of the input channels according to the
189     *  {@link #operation} parameter and return the
190     *  single token result.
191     *  Specifically, it reads one token from each input channel
192     *  that has a token, aggregates these, and returns the aggregate.
193     *  @param channelIndex The channel index. This is required to be 0.
194     *  @return An aggregation of the tokens from all input channels.
195     *  @exception NoTokenException If there is no token.
196     *  @exception IllegalActionException If there is no director, and hence
197     *   no receivers have been created, if the port is not an input port, or
198     *   if the channel index is not 0.
199     */
200    @Override
201    public Token get(int channelIndex)
202            throws NoTokenException, IllegalActionException {
203        if (channelIndex != 0) {
204            throw new IllegalActionException(this,
205                    "Although it is a multiport, you can only read"
206                            + " from channel 0 of a SubscriptionAggregatorPort.");
207        }
208        Token result = null;
209        for (int i = 0; i < getWidth(); i++) {
210            if (super.hasToken(i)) {
211                Token input = super.get(i);
212                if (result == null) {
213                    result = input;
214                } else {
215                    if (_addOperation) {
216                        result = result.add(input);
217                    } else {
218                        result = result.multiply(input);
219                    }
220                }
221            }
222        }
223        if (result == null) {
224            throw new NoTokenException(this, "No input tokens");
225        }
226        return result;
227    }
228
229    /** Get an array of tokens from the specified channel.
230     *  This overrides the base class to first ensure that
231     *  the <i>channelIndex</i> is 0 (or an exception is
232     *  thrown), and then to aggregate the tokens from all
233     *  of the input channels according to the
234     *  {@link #operation} parameter and return the
235     *  single token result.
236     *  Specifically, it reads one token from each input channel
237     *  that has a token, aggregates these, and returns the aggregate.
238     *  @param channelIndex The channel index. This is required to be 0.
239     *  @param vectorLength The number of valid tokens to get in the
240     *   returned array.
241     *  @return A token array with length
242     *   <i>vectorLength</i> aggregating the inputs.
243     *  @exception NoTokenException If there is not enough tokens.
244     *  @exception IllegalActionException If there is no director, and hence
245     *   no receivers have been created, if the port is not an input port, or
246     *   if the channel index is not 0.
247     */
248    @Override
249    public Token[] get(int channelIndex, int vectorLength)
250            throws NoTokenException, IllegalActionException {
251        if (channelIndex != 0) {
252            throw new IllegalActionException(this,
253                    "Although it is a multiport, you can only read"
254                            + " from channel 0 of a SubscriptionAggregatorPort.");
255        }
256        Token[] result = null;
257        for (int i = 0; i < getWidth(); i++) {
258            if (super.hasToken(i, vectorLength)) {
259                Token[] input = super.get(i, vectorLength);
260                if (result == null) {
261                    result = input;
262                } else {
263                    if (_addOperation) {
264                        for (int j = 0; j < vectorLength; j++) {
265                            result[j] = result[j].add(input[j]);
266                        }
267                    } else {
268                        for (int j = 0; j < vectorLength; j++) {
269                            result[j] = result[j].multiply(input[j]);
270                        }
271                    }
272                }
273            }
274        }
275        if (result == null) {
276            throw new NoTokenException(this, "Not engouh input tokens");
277        }
278        return result;
279    }
280
281    /** Return the inside width of this port, which in this class is
282     *  always 1.
283     *  @return The width of the inside of the port.
284     */
285    @Override
286    public int getWidthInside() {
287        return 1;
288    }
289
290    /** Return true if any input channel has a token.
291     *  @param channelIndex The channel index. This is required to be 0.
292     *  @return True if any input channel has a token.
293     *  @exception IllegalActionException If the channel index is not 0
294     *   or if the superclass throws it.
295     */
296    @Override
297    public boolean hasToken(int channelIndex) throws IllegalActionException {
298        /* Allow asking about other channels.
299        if (channelIndex != 0) {
300            throw new IllegalActionException(
301                    this,
302                    "Although it is a multiport, you can only read"
303                            + " from channel 0 of a SubscriptionAggregatorPort.");
304        }
305         */
306        for (int i = 0; i < getWidth(); i++) {
307            if (super.hasToken(i)) {
308                return true;
309            }
310        }
311        return false;
312    }
313
314    /** Return true if every input channel that has tokens has enough tokens.
315     *  @param channelIndex The channel index. This is required to be 0.
316     *  @param vectorLength The number of tokens to query the channel for.
317     *  @return True if every input channel that has tokens has enough tokens.
318     *  @exception IllegalActionException If the channel index is not 0
319     *   or if the superclass throws it.
320     */
321    @Override
322    public boolean hasToken(int channelIndex, int vectorLength)
323            throws IllegalActionException {
324        /* Allow asking about other channels.
325        if (channelIndex != 0) {
326            throw new IllegalActionException(
327                    this,
328                    "Although it is a multiport, you can only read"
329                            + " from channel 0 of a SubscriptionAggregatorPort.");
330        }
331         */
332        boolean foundOne = false;
333        for (int i = 0; i < getWidth(); i++) {
334            if (super.hasToken(i)) {
335                foundOne = true;
336                if (!super.hasToken(i, vectorLength)) {
337                    return false;
338                }
339            }
340        }
341        return foundOne;
342    }
343
344    /** Check that the port is not in the top level, then
345     *  call preinitialize() in the super class.
346     *  @exception IllegalActionException If the port is in
347     *  the top level.
348     */
349    @Override
350    public void preinitialize() throws IllegalActionException {
351        NamedObj actor = getContainer();
352        if (actor != null && actor.getContainer() == null) {
353            throw new IllegalActionException(this,
354                    "SubscriptionAggregatorPorts cannot be used at the top level, use a SubscriptionAggregator actor instead.");
355        }
356        super.preinitialize();
357    }
358
359    ///////////////////////////////////////////////////////////////////
360    ////                         protected methods                 ////
361
362    /** Override the base class to always return 1.
363     *  @param except The relation to exclude.
364     *  @return The sums of the width of the relations linked on the inside,
365     *  except for the specified port.
366     */
367    @Override
368    protected int _getInsideWidth(IORelation except) {
369        return 1;
370    }
371
372    /** Update the connection to the publishers, if there are any.
373     *  @exception IllegalActionException If creating the link
374     *   triggers an exception.
375     */
376    @Override
377    protected void _updateLinks() throws IllegalActionException {
378        // This overrides the base class to Pattern version
379        // rather than the String version of linkToPublishedPort().
380
381        // If the channel has not been set, then there is nothing
382        // to do.  This is probably the first setContainer() call,
383        // before the object is fully constructed.
384        if (_channelPattern == null) {
385            return;
386        }
387
388        NamedObj immediateContainer = getContainer();
389        if (immediateContainer != null) {
390            NamedObj container = immediateContainer.getContainer();
391            if (container instanceof CompositeActor) {
392                try {
393                    try {
394                        ((CompositeActor) container).linkToPublishedPort(
395                                _channelPattern, this, _global);
396                    } catch (IllegalActionException ex) {
397                        // If we have a LazyTypedCompositeActor that
398                        // contains the Publisher, then populate() the
399                        // model, expanding the LazyTypedCompositeActors
400                        // and retry the link.  This is computationally
401                        // expensive.
402                        // See $PTII/ptolemy/actor/lib/test/auto/LazyPubSub.xml
403                        _updatePublisherPorts((CompositeEntity) toplevel());
404                        // Now try again.
405                        ((CompositeActor) container).linkToPublishedPort(
406                                _channelPattern, this, _global);
407                    }
408                } catch (NameDuplicationException e) {
409                    throw new IllegalActionException(this, e,
410                            "Can't link SubscriptionAggregatorPort with a PublisherPort.");
411                }
412            }
413        }
414    }
415
416    ///////////////////////////////////////////////////////////////////
417    ////                         private variables                 ////
418
419    /** Indicator that the operation is "add" rather than "multiply". */
420    private boolean _addOperation = true;
421
422    /** Regex Pattern for _channelName. */
423    private Pattern _channelPattern;
424}