001/* An output port that publishes its data on a named channel. 002 003 Copyright (c) 1997-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 */ 027package ptolemy.actor; 028 029import java.util.HashSet; 030import java.util.Set; 031 032import ptolemy.actor.util.DFUtilities; 033import ptolemy.data.ArrayToken; 034import ptolemy.data.BooleanToken; 035import ptolemy.data.Token; 036import ptolemy.data.expr.Parameter; 037import ptolemy.data.expr.StringParameter; 038import ptolemy.data.type.BaseType; 039import ptolemy.kernel.ComponentEntity; 040import ptolemy.kernel.InstantiableNamedObj; 041import ptolemy.kernel.util.Attribute; 042import ptolemy.kernel.util.IllegalActionException; 043import ptolemy.kernel.util.InternalErrorException; 044import ptolemy.kernel.util.KernelException; 045import ptolemy.kernel.util.NameDuplicationException; 046import ptolemy.kernel.util.NamedObj; 047 048/////////////////////////////////////////////////////////////////// 049//// PublisherPort 050 051/** 052 This is a specialized output port that publishes data sent through it on 053 the specified named channel. The tokens are 054 "tunneled" to any instance of {@link SubscriberPort} that names the same channel. 055 If {@link #global} is false (the default), then this publisher 056 will only send to instances of SubscriberPort that are under the 057 control of the same director. That is, it can 058 be at a different level of the hierarchy, or in an entirely different 059 composite actor, as long as the relevant composite actors are 060 transparent (have no director). If {@link #global} is true, 061 then the subscriber may be anywhere in the model, as long as its 062 <i>global</i> parameter is also true. 063 <p> 064 It is an error to have two instances of PublisherPort using the same 065 channel under the control of the same director. When you create a 066 new PublisherPort, by default, it has no channel name. You have to 067 specify a channel name to use it. 068 <p> 069 <b>How it works:</b> 070 When the channel name 071 is specified, typically during model construction, this actor 072 causes a relation to be created in the least opaque composite 073 actor above it in the hierarchy and links to that relation. 074 In addition, if {@link #global} is set to true, it causes 075 a port to be created in that composite, and also links that 076 port to the relation on the inside. The relation is recorded by the opaque 077 composite. When a SubscriberPort is preinitialized that refers 078 to the same channel, that SubscriberPort finds the relation (by 079 finding the least opaque composite actor above it) and links 080 to the relation. Some of these links are "liberal links" in that 081 they cross levels of the hierarchy. 082 <p> 083 Since publishers are linked to subscribers, 084 any data dependencies that the director might assume on a regular 085 "wired" connection will also be assumed across publisher-subscriber 086 pairs. Similarly, type constraints will propagate across 087 publisher-subscriber pairs. That is, the type of the subscriber 088 output will match the type of the publisher input. 089 090 @author Edward A. Lee 091 @version $Id$ 092 @since Ptolemy II 10.0 093 @Pt.ProposedRating Yellow (eal) 094 @Pt.AcceptedRating Red (eal) 095 */ 096public class PublisherPort extends PubSubPort { 097 098 /** Construct a publisher port with the specified name and container. 099 * @param container The container actor. 100 * @param name The name of the port. 101 * @exception IllegalActionException If the port is not of an acceptable 102 * class for the container, or if the container does not implement the 103 * Actor interface. 104 * @exception NameDuplicationException If the name coincides with 105 * a port already in the container. 106 */ 107 public PublisherPort(ComponentEntity container, String name) 108 throws IllegalActionException, NameDuplicationException { 109 super(container, name); 110 111 propagateNameChanges = new Parameter(this, "propagateNameChanges"); 112 propagateNameChanges.setExpression("false"); 113 propagateNameChanges.setTypeEquals(BaseType.BOOLEAN); 114 115 setOutput(true); 116 setInput(false); 117 118 // In order for this to show up in the vergil library, it has to have 119 // an icon description. 120 _attachText("_smallIconDescription", "<svg>\n" 121 + "<polygon points=\"-8,9 -2,4 12,4 12,-4 -2,-4 -8,-9\" " 122 + "style=\"fill:cyan\"/>\n" + "</svg>\n"); 123 } 124 125 /////////////////////////////////////////////////////////////////// 126 //// parameters //// 127 128 /** If true, then propagate channel name changes to any 129 * Subscribers. The default value is a BooleanToken with the 130 * value false, indicating that if the channel name is changed, 131 * then the channel names of the Subscribers are not changed. If 132 * the value is true, then if the channel name is changed, the 133 * channel names of the connected Subscribers are updated. 134 * 135 * <p>If the value is true, then SubscriptionAggregators that 136 * have the same regular expression as the channel name of the 137 * Publisher will be updated. However, SubscriptionAggregators 138 * usually have regular expressions as channel names, so usually 139 * the channel name of the SubscriptionAggregator will <b>not</b> 140 * be updated.</p> 141 * 142 * <p>Note that if a Publisher is within an Actor Oriented Class 143 * definition, then any Subscribers with the same channel name in 144 * Actor Oriented Class definitions will <b>not</b> be updated. 145 * This is because there is no connection between the Publisher 146 * in the Actor Oriented Class definition and the Subscriber. 147 * However, if the channel name in a Publisher in an instance of 148 * an Actor Oriented Class is updated, then the 149 * corresponding Subscribers in instances of Actor Oriented Class 150 * will be updated.</p> 151 */ 152 public Parameter propagateNameChanges; 153 154 /////////////////////////////////////////////////////////////////// 155 //// public methods //// 156 157 /** Throw an exception. 158 * Adding initializables to the container is not supported. 159 */ 160 @Override 161 public void addInitializable(Initializable initializable) { 162 throw new InternalErrorException( 163 "Cannot add Initializables to PublisherPort."); 164 } 165 166 /** If a publish and subscribe channel is set, then set up the connections. 167 * @param attribute The attribute that changed. 168 * @exception IllegalActionException Thrown if the new color attribute cannot 169 * be created. 170 */ 171 @Override 172 public void attributeChanged(Attribute attribute) 173 throws IllegalActionException { 174 if (attribute == channel || attribute == global) { 175 // We only get the value if we are not in a class definition. 176 // Class definitions need not have connections. 177 // The connections will be made in the instances. 178 // The reason is that some of the Actor Oriented Classes 179 // that use Publishers do not have the parameter defined 180 // in the definition. See 181 // ptolemy/actor/lib/test/auto/PublisherClassNoParameter.xml 182 NamedObj immediateContainer = getContainer(); 183 if (immediateContainer != null) { 184 NamedObj container = immediateContainer.getContainer(); 185 // NOTE: During cloning, the container reports that it is in a class definition! 186 // Hence, this PublisherPort has to do the registering when clone is 187 // set to no longer be a class definition. 188 if (container instanceof InstantiableNamedObj 189 && !((InstantiableNamedObj) container) 190 .isWithinClassDefinition() 191 // || (container == null 192 // && immediateContainer instanceof InstantiableNamedObj 193 // && !((InstantiableNamedObj)immediateContainer).isWithinClassDefinition()) 194 ) { 195 String newValue = channel.stringValue(); 196 boolean globalValue = ((BooleanToken) global.getToken()) 197 .booleanValue(); 198 if (!newValue.equals(_channel) || globalValue != _global) { 199 // if (container == null 200 // && immediateContainer instanceof InstantiableNamedObj 201 // && !((InstantiableNamedObj)immediateContainer).isWithinClassDefinition()) { 202 // // Port is in the toplevel. 203 // container = immediateContainer; 204 // } 205 if (container instanceof CompositeActor) { 206 // The vergil and config tests were failing because 207 // moml.EntityLibrary sometimes contains Subscribers. 208 try { 209 if (attribute == global) { 210 if (_global && !globalValue) { 211 // Changing from global to non-global. 212 ((CompositeActor) container) 213 .unregisterPublisherPort( 214 _channel, this, true); 215 } 216 } 217 218 if (attribute == channel && !(_channel == null 219 || _channel.trim().equals(""))) { 220 // Changing the channel from a previous channel name. 221 if (((BooleanToken) propagateNameChanges 222 .getToken()).booleanValue()) { 223 try { 224 _updateChannelNameOfConnectedSubscribers( 225 _channel, newValue); 226 } catch (KernelException ex) { 227 throw new IllegalActionException( 228 this, ex, 229 "Failed to set channel to " 230 + newValue); 231 } 232 } 233 } 234 235 if (attribute == channel && !(_channel == null 236 || _channel.trim().equals(""))) { 237 ((CompositeActor) container) 238 .unregisterPublisherPort(_channel, 239 this, _global); 240 } 241 ((CompositeActor) container) 242 .registerPublisherPort(newValue, this, 243 globalValue); 244 245 } catch (NameDuplicationException e) { 246 throw new IllegalActionException(this, e, 247 "Can't add published port."); 248 } 249 } 250 _channel = newValue; 251 _global = globalValue; 252 } 253 } 254 } 255 } else if (attribute == initialTokens) { 256 // Set the production rate parameter for the benefit of SDF. 257 // If this port is not opaque, SDF will not see it, so the 258 // corresponding SubscriberPorts become responsible for 259 // setting their tokenInitConsumption parameters. 260 Token initialOutputsValue = initialTokens.getToken(); 261 if (initialOutputsValue != null) { 262 if (!(initialOutputsValue instanceof ArrayToken)) { 263 throw new IllegalActionException(this, 264 "initialOutputs value is required to be an array."); 265 } 266 int length = ((ArrayToken) initialOutputsValue).length(); 267 DFUtilities.setOrCreate(this, "tokenInitProduction", length); 268 } 269 } else { 270 super.attributeChanged(attribute); 271 } 272 } 273 274 /** Notify this object that the containment hierarchy above it has 275 * changed. This registers the port as a publisher with the 276 * container of the container, if there is one. 277 * @exception IllegalActionException If the change is not 278 * acceptable. 279 */ 280 @Override 281 public void hierarchyChanged() throws IllegalActionException { 282 // NOTE: It is not OK to access the cached variable _channel 283 // here instead of the channel parameter because the parameter 284 // may not have been validating (during instantiation of 285 // actor-oriented classes). 286 String channelValue = null; 287 try { 288 // The channel may refer to parameters via $ 289 // but the parameters are not yet in scope. 290 channelValue = channel.stringValue(); 291 } catch (Throwable throwable) { 292 // Ignore this on the assumption that we will 293 // get another chance when the channel is set. 294 } 295 if (channelValue != null && !channelValue.equals("")) { 296 NamedObj immediateContainer = getContainer(); 297 if (immediateContainer != null) { 298 NamedObj container = immediateContainer.getContainer(); 299 if (container instanceof CompositeActor) { 300 try { 301 ((CompositeActor) container).registerPublisherPort( 302 channelValue, this, _global); 303 // Need to make sure to record the channel name 304 // so that it gets unregistered if it later changes. 305 // In particular, _channel may be null if attributeChanged() 306 // has not been called. 307 // NOTE: Should we check for some value of _channel other 308 // than null? There shouldn't be any that doesn't match 309 // the channelValue. 310 _channel = channelValue; 311 } catch (NameDuplicationException e) { 312 throw new InternalErrorException(e); 313 } 314 } 315 } 316 } 317 super.hierarchyChanged(); 318 } 319 320 /** Notify this object that the containment hierarchy above it will be 321 * changed, which results in publisher ports being unregistered. 322 * @exception IllegalActionException If unlinking to a published port fails. 323 */ 324 @Override 325 public void hierarchyWillChange() throws IllegalActionException { 326 if (channel != null) { 327 String channelValue = null; 328 try { 329 // The channel may refer to parameters via $ 330 // but the parameters are not yet in scope. 331 channelValue = channel.stringValue(); 332 } catch (Throwable throwable) { 333 channelValue = channel.getExpression(); 334 } 335 if (channelValue != null) { 336 NamedObj immediateContainer = getContainer(); 337 if (immediateContainer != null) { 338 NamedObj container = immediateContainer.getContainer(); 339 if (container instanceof CompositeActor) { 340 try { 341 ((CompositeActor) container) 342 .unregisterPublisherPort(channelValue, this, 343 _global); 344 } catch (NameDuplicationException e) { 345 throw new InternalErrorException(e); 346 } 347 } 348 } 349 } 350 } 351 super.hierarchyWillChange(); 352 } 353 354 /** If {@link #initialTokens} has been set, then produce the 355 * outputs specified by its array value. 356 * @exception IllegalActionException If initialTokens is invalid. 357 */ 358 @Override 359 public void initialize() throws IllegalActionException { 360 if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) { 361 // Don't initialize Class Definitions. 362 // See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml 363 return; 364 } 365 366 Token initialOutputsValue = initialTokens.getToken(); 367 if (initialOutputsValue instanceof ArrayToken) { 368 // If this port has inside receivers, then it is an opaque port 369 // for a composite actor, and the right way to send outputs is 370 // to populate the inside receivers. 371 Receiver[][] receivers = getInsideReceivers(); 372 if (receivers != null && receivers.length > 0 373 && receivers[0].length > 0) { 374 for (Receiver[] receiver : receivers) { 375 for (int j = 0; j < receivers.length; j++) { 376 for (Token token : ((ArrayToken) initialOutputsValue) 377 .arrayValue()) { 378 receiver[j].put(token); 379 } 380 } 381 } 382 } else { 383 // If this port is transparent or is contained by an atomic actor, then 384 // send initial tokens directly from it. It is not correct to send 385 // them from the source ports connected on the inside because those 386 // ports may also have other destinations. 387 for (Token token : ((ArrayToken) initialOutputsValue) 388 .arrayValue()) { 389 broadcast(token); 390 } 391 } 392 } 393 } 394 395 /** Override the base class to throw an exception if this port is at the top level. 396 * @exception IllegalActionException If the port is in 397 * the top level, or if the superclass throws it. 398 */ 399 @Override 400 public void preinitialize() throws IllegalActionException { 401 NamedObj actor = getContainer(); 402 if (actor != null && actor.getContainer() == null) { 403 throw new IllegalActionException(this, 404 "PublisherPorts cannot be used at the top level, use a Publisher actor instead."); 405 } 406 super.preinitialize(); 407 } 408 409 /** Override the base class to refuse to accept setting to be an input. 410 * @param isInput Required to be false. 411 * @exception IllegalActionException If the argument is true. 412 */ 413 @Override 414 public void setInput(boolean isInput) throws IllegalActionException { 415 if (isInput) { 416 throw new IllegalActionException(this, 417 "PublisherPort cannot be an input port."); 418 } 419 super.setInput(false); 420 } 421 422 /** Override the base class to require the port to be an output. 423 * @param isOutput Required to be true. 424 * @exception IllegalActionException If the argument is false. 425 */ 426 @Override 427 public void setOutput(boolean isOutput) throws IllegalActionException { 428 if (!isOutput) { 429 throw new IllegalActionException(this, 430 "PublisherPort is required to be an output port."); 431 } 432 super.setOutput(true); 433 } 434 435 /** Return a Set of SubscriberPort that are connected to this Publisher. 436 * @return A Set of Subscribers that are connected to this Publisher 437 * @exception KernelException If thrown when a Manager is added to 438 * the top level or if preinitialize() fails. 439 */ 440 public Set<SubscriberPort> subscribers() throws KernelException { 441 // preinitialize() creates connections between Publishers and 442 // Subscribers. 443 Manager.preinitializeThenWrapup((Actor) getContainer()); 444 return _dependents(this); 445 } 446 447 /////////////////////////////////////////////////////////////////// 448 //// private methods //// 449 450 /** If the specified port is an instance of 451 * {@link SubscriberPort}, then return a set containing it; 452 * otherwise, return a set of SubscriberPort instances that downstream 453 * of the specified port that subscribe to this publisher. 454 * This method traverses opaque composites. 455 * @param port The port to be checked 456 * @return The Set of all AtomicActors connected to the port. 457 */ 458 private Set<SubscriberPort> _dependents(IOPort port) 459 throws IllegalActionException { 460 //System.out.println("ActorDependencies._dependents: START" + remotePort.getFullName()); 461 Set<SubscriberPort> results = new HashSet<SubscriberPort>(); 462 if (port instanceof SubscriberPort) { 463 results.add((SubscriberPort) port); 464 } else { 465 if (port.isOutput() && port.isInput()) { 466 throw new IllegalActionException(port, 467 "Can't handle port that is both input and output."); 468 } 469 Receiver[][] receivers = null; 470 if (port.isOutput()) { 471 receivers = port.getRemoteReceivers(); 472 } else if (port.isInput()) { 473 receivers = port.deepGetReceivers(); 474 } else { 475 throw new IllegalActionException(port, 476 "Can't handle port that is neither input nor output."); 477 } 478 if (receivers != null) { 479 for (Receiver[] receiver : receivers) { 480 if (receiver != null) { 481 for (int j = 0; j < receiver.length; j++) { 482 if (receiver[j] != null) { 483 IOPort remotePort2 = receiver[j].getContainer(); 484 if (remotePort2 != null) { 485 results.addAll(_dependents(remotePort2)); 486 } 487 } 488 } 489 } 490 } 491 } 492 } 493 return results; 494 } 495 496 /** Update the channel name of any connected SubscriberPorts. 497 * Note that the channel name of connected SubscriptionAggregatorPorts 498 * are not updated. 499 * @param previousChannelName The previous name of the channel. 500 * @param newChannelName The new name of the channel. 501 * @exception KernelException If thrown when a Manager is added to 502 * the top level, or when the channel name of a Subscriber is changed. 503 */ 504 private void _updateChannelNameOfConnectedSubscribers( 505 String previousChannelName, String newChannelName) 506 throws KernelException { 507 508 // This will end up calling attributeChanged() (in order to 509 // defeat lazy composites), and we want to prevent a recursive 510 // call here. 511 if (_inUpdateCall) { 512 return; 513 } 514 _inUpdateCall = true; 515 try { 516 // We use subscribers() here so that we get any subscribers in 517 // Opaque TypedCompositeActors. 518 for (SubscriberPort port : subscribers()) { 519 if (port.channel.stringValue().equals(previousChannelName)) { 520 // Avoid updating SubscriptionAggregators that have regular 521 // expressions that are different than the Publisher channel name. 522 523 // Handle the case where the channel name is an expression 524 // that evaluates to the value of the enclosing Publisher actor's 525 // channel parameter. In that case, we want to change the value 526 // the channel in the Publisher actor, rather than here. 527 if (port.channel.getExpression().equals("$channel")) { 528 NamedObj container = port.getContainer(); 529 Attribute containerChannel = container 530 .getAttribute("channel"); 531 if (containerChannel instanceof StringParameter) { 532 ((StringParameter) containerChannel) 533 .setExpression(newChannelName); 534 container.attributeChanged(containerChannel); 535 port.attributeChanged(port.channel); 536 } else { 537 port.channel.setExpression(newChannelName); 538 port.attributeChanged(port.channel); 539 } 540 } else { 541 port.channel.setExpression(newChannelName); 542 port.attributeChanged(port.channel); 543 } 544 } 545 } 546 } finally { 547 _inUpdateCall = false; 548 } 549 } 550 551 /////////////////////////////////////////////////////////////////// 552 //// private variables //// 553 554 /** Flag to prevent recursive call of subscribers() method. */ 555 private boolean _inUpdateCall = false; 556}