001/* An output port that publishes its data on a named channel. 002 003 Copyright (c) 1997-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 */ 027package ptolemy.actor; 028 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.LinkedList; 032import java.util.List; 033import java.util.Map; 034 035import ptolemy.actor.util.DFUtilities; 036import ptolemy.data.ArrayToken; 037import ptolemy.data.BooleanToken; 038import ptolemy.data.IntToken; 039import ptolemy.data.Token; 040import ptolemy.data.expr.StringParameter; 041import ptolemy.data.expr.Variable; 042import ptolemy.kernel.ComponentEntity; 043import ptolemy.kernel.CompositeEntity; 044import ptolemy.kernel.Entity; 045import ptolemy.kernel.InstantiableNamedObj; 046import ptolemy.kernel.Port; 047import ptolemy.kernel.util.Attribute; 048import ptolemy.kernel.util.IllegalActionException; 049import ptolemy.kernel.util.InternalErrorException; 050import ptolemy.kernel.util.NameDuplicationException; 051import ptolemy.kernel.util.NamedObj; 052 053/////////////////////////////////////////////////////////////////// 054//// SubscriberPort 055 056/** 057 This is a specialized input port that subscribes to data sent 058 to it on the specified named channel. 059 The tokens are "tunneled" from an instance of 060 {@link PublisherPort} that names the same channel. 061 If {@link #global} is false (the default), then this subscriber 062 will only see instances of PublisherPort that are under the 063 control of the same director. That is, it can 064 be at a different level of the hierarchy, or in an entirely different 065 composite actor, as long as the relevant composite actors are 066 transparent (have no director). If {@link #global} is true, 067 then the publisher may be anywhere in the model, as long as its 068 <i>global</i> parameter is also true. 069 <p> 070 Any number of instances of SubscriberPort can subscribe to the same 071 channel. 072 <p> 073 This actor actually has a hidden input port that is connected 074 to the publisher via hidden "liberal links" (links that are 075 allowed to cross levels of the hierarchy). Consequently, 076 any data dependencies that the director might assume on a regular 077 "wired" connection will also be assumed across Publisher-Subscriber 078 pairs. Similarly, type constraints will propagate across 079 Publisher-Subscriber pairs. That is, the type of the Subscriber 080 output will match the type of the Publisher input. 081 082 @author Edward A. Lee, Contributor: Christopher Brooks 083 @version $Id$ 084 @since Ptolemy II 10.0 085 @Pt.ProposedRating Yellow (eal) 086 @Pt.AcceptedRating Red (eal) 087 */ 088public class SubscriberPort extends PubSubPort { 089 090 /** Construct a subscriber port with a containing actor and a name. 091 * This is always an input port. 092 * @param container The container actor. 093 * @param name The name of the port. 094 * @exception IllegalActionException If the port is not of an acceptable 095 * class for the container, or if the container does not implement the 096 * Actor interface. 097 * @exception NameDuplicationException If the name coincides with 098 * a port already in the container. 099 */ 100 public SubscriberPort(ComponentEntity container, String name) 101 throws IllegalActionException, NameDuplicationException { 102 super(container, name); 103 104 setOutput(false); 105 setInput(true); 106 107 // In order for this to show up in the vergil library, it has to have 108 // an icon description. 109 _attachText("_smallIconDescription", "<svg>\n" 110 + "<polygon points=\"0,4 0,9 12,0 0,-9 0,-4 -8,-4 -8,4\" " 111 + "style=\"fill:cyan\"/>\n" + "</svg>\n"); 112 } 113 114 /////////////////////////////////////////////////////////////////// 115 //// public methods //// 116 117 /** If a publish and subscribe channel is set, then set up the connections. 118 * @param attribute The attribute that changed. 119 * @exception IllegalActionException Thrown if the new color attribute cannot 120 * be created. 121 */ 122 @Override 123 public void attributeChanged(Attribute attribute) 124 throws IllegalActionException { 125 if (attribute == channel) { 126 String newValue = channel.stringValue(); 127 if (!newValue.equals(_channel)) { 128 NamedObj immediateContainer = getContainer(); 129 if (immediateContainer != null) { 130 NamedObj container = immediateContainer.getContainer(); 131 if (container instanceof CompositeActor 132 && !(_channel == null 133 || _channel.trim().equals(""))) { 134 ((CompositeActor) container) 135 .unlinkToPublishedPort(_channel, this, _global); 136 } 137 } 138 _channel = newValue; 139 } 140 } else if (attribute == global) { 141 boolean newValue = ((BooleanToken) global.getToken()) 142 .booleanValue(); 143 if (newValue == false && _global == true) { 144 NamedObj immediateContainer = getContainer(); 145 if (immediateContainer != null) { 146 NamedObj container = immediateContainer.getContainer(); 147 if (container instanceof CompositeActor 148 && !(_channel == null 149 || _channel.trim().equals(""))) { 150 ((CompositeActor) container) 151 .unlinkToPublishedPort(_channel, this, _global); 152 } 153 } 154 } 155 _global = newValue; 156 // Do not call SubscriptionAggregator.attributeChanged() 157 // because it will remove the published port name by _channel. 158 // If _channel is set to a real name (not a regex pattern), 159 // Then chaos ensues. See test 3.0 in SubscriptionAggregator.tcl 160 } else if (attribute == initialTokens) { 161 // Set the initial token parameter for the benefit of SDF. 162 // If this port is not opaque, SDF will not see it, so we 163 // will need in preinitialize() to set the init production 164 // of the inside ports. 165 Token initialOutputsValue = initialTokens.getToken(); 166 if (initialOutputsValue != null) { 167 if (!(initialOutputsValue instanceof ArrayToken)) { 168 throw new IllegalActionException(this, 169 "initialOutputs value is required to be an array."); 170 } 171 int length = ((ArrayToken) initialOutputsValue).length(); 172 DFUtilities.setOrCreate(this, "tokenInitProduction", length); 173 } 174 } else { 175 super.attributeChanged(attribute); 176 } 177 } 178 179 /** Notify this object that the containment hierarchy above it has 180 * changed. This restores the tokenInitConsumption parameters of 181 * any ports that had that parameter changed in a previous 182 * call to preinitialize(). 183 * @exception IllegalActionException If the change is not 184 * acceptable. 185 */ 186 @Override 187 public void hierarchyChanged() throws IllegalActionException { 188 // If we have previously set the tokenInitConsumption variable 189 // of some port, restore it now to its original value. 190 if (_tokenInitConsumptionSet != null) { 191 for (IOPort port : _tokenInitConsumptionSet.keySet()) { 192 String previousValue = _tokenInitConsumptionSet.get(port); 193 Variable variable = DFUtilities.getRateVariable(port, 194 "tokenInitConsumption"); 195 if (previousValue == null) { 196 try { 197 variable.setContainer(null); 198 } catch (NameDuplicationException e) { 199 // Should not occur. 200 throw new InternalErrorException(e); 201 } 202 } else { 203 variable.setExpression(previousValue); 204 } 205 } 206 } 207 super.hierarchyChanged(); 208 } 209 210 /** Notify this object that the containment hierarchy above it will be 211 * changed, which results in the channel being unlinked from the publisher. 212 * @exception IllegalActionException If unlinking to a published port fails. 213 */ 214 @Override 215 public void hierarchyWillChange() throws IllegalActionException { 216 if (channel != null) { 217 String channelValue = null; 218 try { 219 // The channel may refer to parameters via $ 220 // but the parameters are not yet in scope. 221 channelValue = channel.stringValue(); 222 } catch (Throwable throwable) { 223 channelValue = channel.getExpression(); 224 } 225 if (channelValue != null) { 226 NamedObj immediateContainer = getContainer(); 227 if (immediateContainer != null) { 228 NamedObj container = immediateContainer.getContainer(); 229 if (container instanceof CompositeActor) { 230 ((CompositeActor) container) 231 .unlinkToPublishedPort(channelValue, this); 232 } 233 } 234 } 235 } 236 super.hierarchyWillChange(); 237 } 238 239 /** If {@link #initialTokens} has been set, then make available the 240 * inputs specified by its array value. 241 */ 242 @Override 243 public void initialize() throws IllegalActionException { 244 if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) { 245 // Don't initialize Class Definitions. 246 // FIXME: Probably shouldn't even be a registered Initializable. 247 // See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml 248 return; 249 } 250 // If the publisher port is not opaque and is an instance of 251 // ConstantPublisherPort, then we have some work to do. If 252 // this port is opaque, we set it to return a constant value 253 // provided by the ConstantPublisherPort. If not, then we have 254 // set the inside destination ports to return constant values. 255 if (_publisherPort instanceof ConstantPublisherPort) { 256 Token constantToken = ((ConstantPublisherPort) _publisherPort).constantValue 257 .getToken(); 258 Token limitToken = ((ConstantPublisherPort) _publisherPort).numberOfTokens 259 .getToken(); 260 int limit = ((IntToken) limitToken).intValue(); 261 if (isOpaque()) { 262 _setConstant(constantToken, limit); 263 } else { 264 // NOTE: insideSinkPortList() doesn't work here if the 265 // port is transparent. The returned list is empty, 266 // unfortunately, so we have duplicate that functionality 267 // here. 268 Director dir = ((Actor) getContainer()).getDirector(); 269 int depthOfDirector = dir.depthInHierarchy(); 270 LinkedList<IOPort> insidePorts = new LinkedList<IOPort>(); 271 Iterator<?> ports = deepInsidePortList().iterator(); 272 273 while (ports.hasNext()) { 274 IOPort port = (IOPort) ports.next(); 275 int depth = port.getContainer().depthInHierarchy(); 276 277 if (port.isInput() && depth >= depthOfDirector) { 278 insidePorts.addLast(port); 279 } else if (port.isOutput() && depth < depthOfDirector) { 280 insidePorts.addLast(port); 281 } 282 } 283 for (IOPort insidePort : insidePorts) { 284 insidePort._setConstant(constantToken, limit); 285 } 286 } 287 } 288 289 Token initialOutputsValue = initialTokens.getToken(); 290 if (initialOutputsValue instanceof ArrayToken) { 291 // If this port is opaque, put the tokens into the receivers. 292 if (isOpaque()) { 293 Receiver[][] receivers = getReceivers(); 294 if (receivers != null) { 295 for (Receiver[] receiver : receivers) { 296 for (int j = 0; j < receivers.length; j++) { 297 for (Token token : ((ArrayToken) initialOutputsValue) 298 .arrayValue()) { 299 receiver[j].put(token); 300 } 301 } 302 } 303 } 304 } else { 305 // The port is not opaque. 306 for (Token token : ((ArrayToken) initialOutputsValue) 307 .arrayValue()) { 308 for (int i = 0; i < getWidth(); i++) { 309 sendInside(i, token); 310 } 311 } 312 } 313 } 314 } 315 316 /** Override the base class to ensure that there is a publisher. 317 * @exception IllegalActionException If there is no matching 318 * publisher, if the channel is not specified or if the port 319 * is in the top level. 320 */ 321 @Override 322 public void preinitialize() throws IllegalActionException { 323 if (_channel == null) { 324 throw new IllegalActionException(this, "No channel specified."); 325 } 326 NamedObj actor = getContainer(); 327 if (actor != null && actor.getContainer() == null) { 328 throw new IllegalActionException(this, 329 "SubscriberPorts cannot be used at the top level, use a Subscriber actor instead."); 330 } 331 if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) { 332 // Don't preinitialize Class Definitions. 333 // See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml 334 return; 335 } 336 _updateLinks(); 337 } 338 339 /** Override the base class to only accept setting to be an input. 340 * @param isInput True to make the port an input. 341 * @exception IllegalActionException If the argument is false. 342 */ 343 @Override 344 public void setInput(boolean isInput) throws IllegalActionException { 345 if (!isInput) { 346 throw new IllegalActionException(this, 347 "SubscriberPort is required to be an input port."); 348 } 349 super.setInput(true); 350 } 351 352 /** Override the base class to refuse to make the port an output. 353 * @param isOutput Required to be false. 354 * @exception IllegalActionException If the argument is true. 355 */ 356 @Override 357 public void setOutput(boolean isOutput) throws IllegalActionException { 358 if (isOutput) { 359 throw new IllegalActionException(this, 360 "SubscriberPort cannot be an output port."); 361 } 362 super.setOutput(false); 363 } 364 365 /////////////////////////////////////////////////////////////////// 366 //// protected methods //// 367 368 /** Update the connection to the publisher, if there is one. 369 * Note that this method is computationally intensive for large 370 * models as it traverses the model by searching 371 * up the hierarchy for the nearest opaque container 372 * or the top level and then traverses the contained entities. 373 * Thus, avoid calling this method except when the model 374 * is running. 375 * @exception IllegalActionException If creating the link 376 * triggers an exception. 377 */ 378 protected void _updateLinks() throws IllegalActionException { 379 // If the channel has not been set, then there is nothing 380 // to do. This is probably the first setContainer() call, 381 // before the object is fully constructed. 382 if (_channel == null) { 383 return; 384 } 385 386 NamedObj immediateContainer = getContainer(); 387 if (immediateContainer != null) { 388 NamedObj container = immediateContainer.getContainer(); 389 if (container instanceof CompositeActor) { 390 try { 391 IOPort publisherPort = null; 392 try { 393 publisherPort = ((CompositeActor) container) 394 .linkToPublishedPort(_channel, this, _global); 395 } catch (IllegalActionException ex) { 396 // If we have a LazyTypedCompositeActor that 397 // contains the Publisher, then populate() the 398 // model, expanding the LazyTypedCompositeActors 399 // and retry the link. This is computationally 400 // expensive. 401 // See $PTII/ptolemy/actor/lib/test/auto/LazyPubSub.xml 402 _updatePublisherPorts((CompositeEntity) toplevel()); 403 // Now try again. 404 try { 405 publisherPort = ((CompositeActor) container) 406 .linkToPublishedPort(_channel, this, 407 _global); 408 } catch (IllegalActionException ex2) { 409 // Rethrow with the "this" so that Go To Actor works. 410 throw new IllegalActionException(this, ex2, 411 "Failed to update link."); 412 } 413 } 414 // Set the init consumption parameter for this port, or if this 415 // port is not opaque, for the opaque ports connected to it on the inside. 416 // The init consumption will be the sum of the number of initial 417 // tokens this port has and the number of initial tokens produced 418 // by the publisher port if it is not opaque (if it is opaque, then 419 // its token init production parameter will be seen by the scheduler). 420 int length = 0; 421 422 Token initialOutputsValue = initialTokens.getToken(); 423 if (initialOutputsValue != null) { 424 length = ((ArrayToken) initialOutputsValue).length(); 425 } 426 427 // If the publisherPort has initial production and is not opaque, 428 // then for the benefit of SDF we need to set the tokenInitConsumption 429 // parameter here so that the SDF scheduler knows that initial tokens 430 // will be available. 431 if (!publisherPort.isOpaque()) { 432 length += DFUtilities 433 .getTokenInitProduction(publisherPort); 434 } 435 _publisherPort = publisherPort; 436 437 if (length > 0) { 438 if (isOpaque()) { 439 DFUtilities.setOrCreate(this, 440 "tokenInitConsumption", length); 441 } else { 442 // If this port is not opaque, then we have 443 // to set the parameter for inside ports that will 444 // actually receive the initial token. 445 if (_tokenInitConsumptionSet == null) { 446 _tokenInitConsumptionSet = new HashMap<IOPort, String>(); 447 } 448 List<IOPort> insidePorts = deepInsidePortList(); 449 for (IOPort port : insidePorts) { 450 Variable previousVariable = DFUtilities 451 .getRateVariable(port, 452 "tokenInitConsumption"); 453 if (previousVariable == null) { 454 _tokenInitConsumptionSet.put(port, null); 455 } else { 456 String previousValue = previousVariable 457 .getExpression(); 458 _tokenInitConsumptionSet.put(port, 459 previousValue); 460 } 461 DFUtilities.setOrCreate(port, 462 "tokenInitConsumption", length); 463 } 464 } 465 } 466 } catch (NameDuplicationException e) { 467 throw new IllegalActionException(this, e, 468 "Can't link SubscriptionAggregatorPort with a PublisherPort."); 469 } 470 } 471 } 472 } 473 474 /** Traverse the model, starting at the specified object 475 * and examining objects below it in the hierarchy, to find 476 * all instances of PublisherPort and make sure that they have 477 * registered their port. This method defeats lazy composites 478 * and is expensive to execute. 479 * @param root The root of the tree to search. 480 * @exception IllegalActionException If the port rejects its channel. 481 */ 482 protected void _updatePublisherPorts(Entity root) 483 throws IllegalActionException { 484 List<Port> ports = root.portList(); 485 for (Port port : ports) { 486 if (port instanceof PublisherPort) { 487 // FIXME: Not sure if this is necessary 488 StringParameter channel = ((PublisherPort) port).channel; 489 channel.validate(); 490 port.attributeChanged(channel); 491 } 492 } 493 if (root instanceof CompositeEntity) { 494 List<Entity> entities = ((CompositeEntity) root).entityList(); 495 for (Entity entity : entities) { 496 _updatePublisherPorts(entity); 497 } 498 } 499 } 500 501 /////////////////////////////////////////////////////////////////// 502 //// private variables //// 503 504 /** The associated publisherPort, found during preinitialize(). */ 505 private IOPort _publisherPort; 506 507 /** Set of ports whose tokenInitConsumption variable has been set 508 * in preinitialize to something other than 0. This is needed so 509 * that these variables can be unset if the hierarchy changes. 510 */ 511 private Map<IOPort, String> _tokenInitConsumptionSet; 512}