001/* A nondeterministic merge actor for PN.
002
003 Copyright (c) 2004-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.domains.pn.kernel;
029
030import java.io.Writer;
031import java.util.Iterator;
032import java.util.List;
033
034import ptolemy.actor.Actor;
035import ptolemy.actor.Director;
036import ptolemy.actor.Manager;
037import ptolemy.actor.TypedAtomicActor;
038import ptolemy.actor.TypedCompositeActor;
039import ptolemy.actor.TypedIOPort;
040import ptolemy.actor.process.ProcessReceiver;
041import ptolemy.data.IntToken;
042import ptolemy.data.Token;
043import ptolemy.data.type.BaseType;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.Port;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.InternalErrorException;
048import ptolemy.kernel.util.KernelException;
049import ptolemy.kernel.util.NameDuplicationException;
050import ptolemy.kernel.util.StringAttribute;
051import ptolemy.kernel.util.Workspace;
052
053///////////////////////////////////////////////////////////////////
054//// NondeterministicMerge
055
056/**
057 This actor takes any number of input streams and merges them
058 nondeterministically.  This actor is intended for use in the
059 PN domain. It is a composite actor that
060 creates its own contents.  It contains an instance of PNDirector and one
061 actor for each input channel (it creates these actors automatically
062 when a connection is created to the input multiport).  The contained
063 actors are special actors (implemented as an instance of an inner class)
064 that read from the port of this actor and write to the port of
065 this actor. They have no ports of their own.  The lifecycle of the
066 contained actors (when they are started or stopped) is handled by
067 the PNDirector in the usual way.
068
069 @author Edward A. Lee, Haibo Zeng
070 @version $Id$
071 @since Ptolemy II 4.1
072 @Pt.ProposedRating Yellow (eal)
073 @Pt.AcceptedRating Red (eal)
074 */
075public class NondeterministicMerge extends TypedCompositeActor {
076    /** Construct an actor in the specified container with the specified
077     *  name. Create ports and make the input port a multiport.
078     *  @param container The container.
079     *  @param name The name.
080     *  @exception NameDuplicationException If an actor
081     *   with an identical name already exists in the container.
082     *  @exception IllegalActionException If the actor cannot be contained
083     *   by the proposed container.
084     */
085    public NondeterministicMerge(CompositeEntity container, String name)
086            throws NameDuplicationException, IllegalActionException {
087        super(container, name);
088        _constructor();
089    }
090
091    /** Construct a TypedCompositeActor in the specified workspace with
092     *  no container and an empty string as a name. You can then change
093     *  the name with setName(). If the workspace argument is null, then
094     *  use the default workspace.  You should set the local director or
095     *  executive director before attempting to send data to the actor
096     *  or to execute it. Add the actor to the workspace directory.
097     *  Increment the version number of the workspace.
098     *  @param workspace The workspace that will list the actor.
099     *  @exception NameDuplicationException If an actor
100     *   with an identical name already exists in the container.
101     *  @exception IllegalActionException If the actor cannot be contained
102     *   by the proposed container.
103     */
104    public NondeterministicMerge(Workspace workspace)
105            throws NameDuplicationException, IllegalActionException {
106        // Added for the sake of Kepler's KAR handling, which needs this
107        // constructor to instantiate composite actors.
108        super(workspace);
109        _constructor();
110    }
111
112    ///////////////////////////////////////////////////////////////////
113    ////                     ports and parameters                  ////
114
115    /** The input port.  This base class imposes no type constraints except
116     *  that the type of the input cannot be greater than the type of the
117     *  output.
118     */
119    public TypedIOPort input;
120
121    /** The output port. By default, the type of this output is constrained
122     *  to be at least that of the input.
123     */
124    public TypedIOPort output;
125
126    /** Output port used to indicate which input channel the current
127     *  output came from. This has type int.
128     */
129    public TypedIOPort channel;
130
131    ///////////////////////////////////////////////////////////////////
132    ////                         public methods                    ////
133
134    /** Override the base class to adjust the number of contained
135     *  actors, if the number is no longer correct.
136     *  @param port The port that has connection changes.
137     */
138    @Override
139    public void connectionsChanged(Port port) {
140        super.connectionsChanged(port);
141
142        if (port == input) {
143            /* The reason why we delay the execution of _reinitializeInnerActors:
144             *          What happens is that the NondeterministicMerge will call
145             *          getWidth in its connectionsChanged method, this will cause
146             *          IORelation to request a token from the width Parameter,
147             *          which will trigger IORelation.attributeChanged (expressions
148             *          are always lazy, which gives unexpected behavior with the
149             *          attributeChanged mechanism).
150             *          (Before the cached version of the width was being used at
151             *          this moment, which resulted in the wrong value being used
152             *          (this was a very old bug).)
153             *          The call of IORelation.attributeChanged will set the
154             *          cached width, which results in IOPort.attributeChanged
155             *          being called, which calls NondeterministicMerge.connectionsChanged
156             *          (again!). This one sees that the component has not added yet
157             *          and does so. Finally the functions all return and we end up
158             *          in the first NondeterministicMerge.connectionsChanged
159             *          again. At the time it called getWidth it knew it had to add
160             *          the component and does so, but in between this was already
161             *          done by the second NondeterministicMerge.connectionsChanged,
162             *          which results in the exception.
163             *          When I move the code that triggers the width and
164             *          adds to new actors to the initialize method of the
165             *          NondeterministicMerge the model
166             *          (ptolemy/domains/pn/demo/BrockAckerman/BrockAckerman.xml) runs
167             *          again, but I'm reluctant to do so, since it might mess up the
168             *          initialization process.
169             *          Moving the code to preinitialize has the disadvantage however
170             *          that width inference might happen multiple times (and definitely
171             *          will for certain type of models).
172             */
173            // If the model is running, create new internal actors if needed.
174            Manager manager = getManager();
175            if (manager != null) {
176                Manager.State managerState = manager.getState();
177                if (managerState == Manager.ITERATING
178                        || managerState == Manager.PAUSED
179                        || managerState == Manager.PAUSED_ON_BREAKPOINT) {
180                    _reinitializeInnerActors();
181                }
182            }
183        }
184    }
185
186    /** Initialize this actor.
187     *  @exception IllegalActionException If the parent class throws it.
188     */
189    @Override
190    public void initialize() throws IllegalActionException {
191        _reinitializeInnerActors();
192
193        // super.initialize(); will initialize the director of this
194        // composite actor (the MergeDirector), which will initialize the
195        // newly created actors
196        super.initialize();
197    }
198
199    ///////////////////////////////////////////////////////////////////
200    ////                         public methods                    ////
201
202    /** Clone the object into the specified workspace. This overrides
203     *  the base class to set instantiate a new MergeDirector,
204     *  @param workspace The workspace for the new object.
205     *  @return A new NamedObj.
206     *  @exception CloneNotSupportedException If any of the attributes
207     *   cannot be cloned.
208     *  @see #exportMoML(Writer, int, String)
209     */
210    @Override
211    public Object clone(Workspace workspace) throws CloneNotSupportedException {
212        NondeterministicMerge result = (NondeterministicMerge) super.clone(
213                workspace);
214        try {
215            // Remove the old inner MergeDirector(s) that is(are) in the wrong workspace.
216            String mergeDirectorName = null;
217            Iterator mergeDirectors = result.attributeList(MergeDirector.class)
218                    .iterator();
219            while (mergeDirectors.hasNext()) {
220                MergeDirector oldMergeDirector = (MergeDirector) mergeDirectors
221                        .next();
222                if (mergeDirectorName == null) {
223                    mergeDirectorName = oldMergeDirector.getName();
224                }
225                oldMergeDirector.setContainer(null);
226            }
227
228            // Create a new MergeDirector that is in the right workspace.
229            MergeDirector mergeDirector = result.new MergeDirector(workspace);
230            mergeDirector.setContainer(result);
231            if (mergeDirectorName != null) {
232                mergeDirector.setName(mergeDirectorName);
233            }
234        } catch (Throwable throwable) {
235            throw new CloneNotSupportedException(
236                    "Could not clone: " + throwable);
237        }
238        return result;
239    }
240
241    ///////////////////////////////////////////////////////////////////
242    ////                         private methods                   ////
243
244    /** Construct a NondeterministicMerge. */
245    private void _constructor()
246            throws NameDuplicationException, IllegalActionException {
247
248        input = new TypedIOPort(this, "input", true, false);
249        output = new TypedIOPort(this, "output", false, true);
250
251        input.setMultiport(true);
252        output.setTypeAtLeast(input);
253
254        channel = new TypedIOPort(this, "channel");
255        channel.setOutput(true);
256        channel.setTypeEquals(BaseType.INT);
257
258        // Add an attribute to get the port placed on the bottom.
259        StringAttribute channelCardinal = new StringAttribute(channel,
260                "_cardinal");
261        channelCardinal.setExpression("SOUTH");
262
263        _attachText("_iconDescription",
264                "<svg>\n" + "<polygon points=\"-10,20 10,10 10,-10, -10,-20\" "
265                        + "style=\"fill:red\"/>\n" + "</svg>\n");
266
267        PNDirector director = new MergeDirector(workspace());
268        director.setContainer(this);
269        director.setName("director");
270    }
271
272    /** Create the contained actors to handle the inputs.
273     */
274    private void _reinitializeInnerActors() {
275        List<?> containedActors = entityList();
276        int numberOfContainedActors = containedActors.size();
277
278        // Create the contained actors to handle the inputs.
279        int inputWidth;
280        try {
281            inputWidth = input.getWidth();
282        } catch (IllegalActionException ex) {
283            throw new InternalErrorException(this, ex,
284                    "At this time IllegalActionExceptions are not allowed to happen.\n"
285                            + "Width inference should already have been done.");
286        }
287
288        for (int i = 0; i < inputWidth; i++) {
289            if (i < numberOfContainedActors) {
290                // Local actor already exists for this channel.
291                // Just wake it up.
292                Object localActor = containedActors.get(i);
293
294                synchronized (localActor) {
295                    localActor.notifyAll();
296                }
297
298                // ProcessThread associated with the actor might
299                // be blocked on a wait on the director.
300                // So we need to notify on the director also.
301                Director director = getExecutiveDirector();
302
303                // If there is no director, then the model cannot be running,
304                // so there is no need to notify.
305                if (director != null) {
306                    synchronized (director) {
307                        director.notifyAll();
308                    }
309                }
310            } else {
311                try {
312                    /*Actor localActor =*/new ChannelActor(i, this);
313
314                    // NOTE: Probably don't want this overhead.
315                    // ((NamedObj)localActor).addDebugListener(this);
316                } catch (KernelException e) {
317                    throw new InternalErrorException(e);
318                }
319            }
320        }
321    }
322
323    ///////////////////////////////////////////////////////////////////
324    ////                         inner classes                     ////
325
326    /** Actor to handle an input channel. It has no ports. It uses the
327     *  ports of the container.
328     */
329    private class ChannelActor extends TypedAtomicActor {
330        public ChannelActor(int index, NondeterministicMerge container)
331                throws IllegalActionException, NameDuplicationException {
332            super(container, "ChannelActor" + index);
333            _channelIndex = index;
334            _channelValue = new IntToken(_channelIndex);
335        }
336
337        // Override the base class to not export anything.
338        @Override
339        public void exportMoML(Writer output, int depth, String name) {
340        }
341
342        @Override
343        public void fire() throws IllegalActionException {
344            // If there is no connection, do nothing.
345            if (input.getWidth() > _channelIndex) {
346                // NOTE: Reading from the input port of the host actor.
347                if (!NondeterministicMerge.this._stopRequested
348                        && input.hasToken(_channelIndex)) {
349                    if (_debugging) {
350                        NondeterministicMerge.this
351                                ._debug("Waiting for input from channel "
352                                        + _channelIndex);
353                    }
354
355                    // NOTE: Writing to the port of the host actor.
356                    Token result = input.get(_channelIndex);
357
358                    // We require that the send to the two output ports be
359                    // atomic so that the channel port gets tokens
360                    // in the same order as the output port.
361                    // We synchronize on the director because the send()
362                    // may call wait() on the director of the container,
363                    // so synchronizing on anything else could cause deadlock.
364                    synchronized (((NondeterministicMerge) getContainer())
365                            .getExecutiveDirector()) {
366                        output.send(0, result);
367                        channel.send(0, _channelValue);
368                    }
369
370                    if (_debugging) {
371                        NondeterministicMerge.this
372                                ._debug("Sent " + result + " from channel "
373                                        + _channelIndex + " to the output.");
374                    }
375                }
376            } else {
377                // Input channel is no longer connected.
378                // We don't want to spin lock here, so we
379                // wait.
380                // NOTE: synchronizing is neither allowed
381                // nor necessary here. See workspace().wait(Object).
382                // synchronized (this) {
383                try {
384                    workspace().wait(this);
385                } catch (InterruptedException ex) {
386                    // Ignore and continue executing.
387                }
388                // }
389            }
390        }
391
392        // Override to return the manager associate with the host.
393        @Override
394        public Manager getManager() {
395            return NondeterministicMerge.this.getManager();
396        }
397
398        private int _channelIndex;
399
400        private IntToken _channelValue;
401    }
402
403    /** Variant of the PNDirector for the NondeterministicMerge actor.
404     */
405    private class MergeDirector extends PNDirector {
406        /** Construct an MergeDirector in the specified workspace with
407         *  no container and an empty string as a name. You can then change
408         *  the name with setName(). If the workspace argument is null, then
409         *  use the default workspace.  You should set the local director or
410         *  executive director before attempting to send data to the actor
411         *  or to execute it. Add the actor to the workspace directory.
412         *  Increment the version number of the workspace.
413         *  @param workspace The workspace that will list the actor.
414         *  @exception IllegalActionException If the container is incompatible
415         *   with this actor.
416         *  @exception NameDuplicationException If the name coincides with
417         *   an actor already in the container.
418         */
419        public MergeDirector(Workspace workspace)
420                throws IllegalActionException, NameDuplicationException {
421            super(workspace);
422            setPersistent(false);
423        }
424
425        /** Queue an initialization request with the manager.
426         *  The specified actor will be initialized at an appropriate time,
427         *  between iterations, by calling its preinitialize() and initialize()
428         *  methods. This method is called by CompositeActor when an actor
429         *  sets its container to that composite actor.  Typically, that
430         *  will occur when a model is first constructed, and during the
431         *  execute() method of a ChangeRequest.
432         *  We do nothing here in this implementation:
433         *  When these actors are added during the initialization phase
434         *  setContainer results in the call of this method, which will
435         *  requestInitialization, which will normally delegate the action
436         *  to the Manager.
437         *  super.initialize() in NondeterministicMerge will however
438         *  initialize the director of this
439         *  composite actor (the MergeDirector), which will initialize the
440         *  newly created actors. Hence we don't need to do it again here.
441         *  @param actor The actor to initialize.
442         */
443        @Override
444        public void requestInitialization(Actor actor) {
445        }
446
447        /** Override the base class to delegate to the executive director.
448         *  This director does not keep track of threads.
449         *  @param thread The thread.
450         */
451        @Override
452        public synchronized void addThread(Thread thread) {
453            Director director = getExecutiveDirector();
454
455            if (director instanceof PNDirector) {
456                ((PNDirector) director).addThread(thread);
457            } else {
458                throw new InternalErrorException(
459                        "NondeterministicMerge actor can only execute"
460                                + " under the control of a PNDirector!");
461            }
462        }
463
464        /** Do nothing.
465         */
466        @Override
467        public void fire() {
468            // Do not call super.fire() here because ProcessDirector.fire()
469            // waits until a deadlock is detected, which we don't want to do.
470        }
471
472        /** Return false since this director has nothing to do.
473         *  @return False.
474         */
475        @Override
476        public boolean postfire() {
477            return false;
478        }
479
480        /** Override the base class to delegate to the executive director.
481         *  This director does not keep track of threads.
482         *  @param thread The thread.
483         */
484        @Override
485        public synchronized void removeThread(Thread thread) {
486            Director director = getExecutiveDirector();
487
488            if (director instanceof PNDirector) {
489                ((PNDirector) director).removeThread(thread);
490            } else {
491                throw new InternalErrorException(
492                        "NondeterministicMerge actor can only execute"
493                                + " under the control of a PNDirector!");
494            }
495        }
496
497        /** Override the base class to delegate to the executive director.
498         *  This director does not keep track of threads.
499         *  @param thread The thread.
500         *  @param receiver The receiver handling the I/O operation,
501         *   or null if it is not a specific receiver.
502         *  @see #threadBlocked(Thread, ProcessReceiver, boolean)
503         */
504        @Override
505        public synchronized void threadBlocked(Thread thread,
506                ProcessReceiver receiver) {
507            Director director = getExecutiveDirector();
508
509            if (director instanceof PNDirector) {
510                ((PNDirector) director).threadBlocked(thread, receiver);
511            } else {
512                throw new InternalErrorException(
513                        "NondeterministicMerge actor can only execute"
514                                + " under the control of a PNDirector!");
515            }
516        }
517
518        /** Override the base class to delegate to the executive director.
519         *  This director does not keep track of threads.
520         *  @param thread The thread.
521         *  @param receiver The receiver handling the I/O operation,
522         *   or null if it is not a specific receiver.
523         *  @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED
524         *   to indicate whether the thread is blocked on read or write.
525         *  @see #threadBlocked(Thread, ProcessReceiver)
526         */
527        @Override
528        public synchronized void threadBlocked(Thread thread,
529                ProcessReceiver receiver, boolean readOrWrite) {
530            Director director = getExecutiveDirector();
531
532            if (director instanceof PNDirector) {
533                ((PNDirector) director).threadBlocked(thread, receiver,
534                        readOrWrite);
535            } else {
536                throw new InternalErrorException(
537                        "NondeterministicMerge actor can only execute"
538                                + " under the control of a PNDirector!");
539            }
540        }
541
542        /** Override the base class to delegate to the executive director.
543         *  This director does not keep track of threads.
544         *  @param thread The thread.
545         */
546        @Override
547        public synchronized void threadHasPaused(Thread thread) {
548            Director director = getExecutiveDirector();
549
550            if (director instanceof PNDirector) {
551                ((PNDirector) director).threadHasPaused(thread);
552            } else {
553                throw new InternalErrorException(
554                        "NondeterministicMerge actor can only execute"
555                                + " under the control of a PNDirector!");
556            }
557        }
558
559        /** Override the base class to delegate to the executive director.
560         *  This director does not keep track of threads.
561         *  @param thread The thread.
562         */
563        @Override
564        public synchronized void threadHasResumed(Thread thread) {
565            Director director = getExecutiveDirector();
566
567            if (director instanceof PNDirector) {
568                ((PNDirector) director).threadHasResumed(thread);
569            } else {
570                throw new InternalErrorException(
571                        "NondeterministicMerge actor can only execute"
572                                + " under the control of a PNDirector!");
573            }
574        }
575
576        /** Override the base class to delegate to the executive director.
577         *  This director does not keep track of threads.
578         *  @param thread The thread.
579         *  @param receiver The receiver handling the I/O operation,
580         *   or null if it is not a specific receiver.
581         *  @see #threadBlocked(Thread, ProcessReceiver)
582         */
583        @Override
584        public synchronized void threadUnblocked(Thread thread,
585                ProcessReceiver receiver) {
586            Director director = getExecutiveDirector();
587
588            if (director instanceof PNDirector) {
589                ((PNDirector) director).threadUnblocked(thread, receiver);
590            } else {
591                throw new InternalErrorException(
592                        "NondeterministicMerge actor can only execute"
593                                + " under the control of a PNDirector!");
594            }
595        }
596
597        /** Override the base class to delegate to the executive director.
598         *  This director does not keep track of threads.
599         *  @param thread The thread.
600         *  @param receiver The receiver handling the I/O operation,
601         *   or null if it is not a specific receiver.
602         *  @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED
603         *   to indicate whether the thread is blocked on read or write.
604         *  @see #threadBlocked(Thread, ProcessReceiver, boolean)
605         */
606        @Override
607        public synchronized void threadUnblocked(Thread thread,
608                ProcessReceiver receiver, boolean readOrWrite) {
609            Director director = getExecutiveDirector();
610
611            if (director instanceof PNDirector) {
612                ((PNDirector) director).threadUnblocked(thread, receiver,
613                        readOrWrite);
614            } else {
615                throw new InternalErrorException(
616                        "NondeterministicMerge actor can only execute"
617                                + " under the control of a PNDirector!");
618            }
619        }
620
621        /** Do nothing.
622         */
623        @Override
624        public void wrapup() {
625        }
626
627        // Override since deadlock cannot ever occur internally.
628        @Override
629        protected boolean _resolveDeadlock() {
630            if (_debugging) {
631                _debug("Deadlock is not real as "
632                        + "NondeterministicMerge can't deadlock.");
633            }
634
635            return true;
636        }
637    }
638}