001/*
002 * Copyright (c) 2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: welker $'
006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 
007 * '$Revision: 24234 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030/* CPES Actor for processing a stream of strings,
031   waiting for a specific termination element Nth occurence
032   and emit the termination element only that time.
033   Do not use in SDF!
034 */
035/**
036 *    '$RCSfile$'
037 *
038 *     '$Author: welker $'
039 *       '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $'
040 *   '$Revision: 24234 $'
041 *
042 *  For Details: http://www.kepler-project.org
043 *
044 * Copyright (c) 2004 The Regents of the University of California.
045 * All rights reserved.
046 *
047 * Permission is hereby granted, without written agreement and without
048 * license or royalty fees, to use, copy, modify, and distribute this
049 * software and its documentation for any purpose, provided that the
050 * above copyright notice and the following two paragraphs appear in
051 * all copies of this software.
052 *
053 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
054 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
055 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN
056 * IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY
057 * OF SUCH DAMAGE.
058 *
059 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
060 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
061 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
062 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY
063 * OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT,
064 * UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
065 */
066
067package org.sdm.spa;
068
069import java.util.Iterator;
070import java.util.LinkedList;
071import java.util.List;
072import java.util.Set;
073
074import org.apache.commons.logging.Log;
075import org.apache.commons.logging.LogFactory;
076
077import ptolemy.actor.TypedAtomicActor;
078import ptolemy.actor.TypedIOPort;
079import ptolemy.data.BooleanToken;
080import ptolemy.data.IntToken;
081import ptolemy.data.RecordToken;
082import ptolemy.data.Token;
083import ptolemy.data.expr.Parameter;
084import ptolemy.data.type.BaseType;
085import ptolemy.data.type.MonotonicFunction;
086import ptolemy.data.type.RecordType;
087import ptolemy.data.type.Type;
088import ptolemy.graph.Inequality;
089import ptolemy.graph.InequalityTerm;
090import ptolemy.kernel.CompositeEntity;
091import ptolemy.kernel.util.Attribute;
092import ptolemy.kernel.util.IllegalActionException;
093import ptolemy.kernel.util.NameDuplicationException;
094
095//////////////////////////////////////////////////////////////////////////
096//// SyncOnTerminator
097
098/**
099 * <p>
100 * Pass on a stream of tokens except for a specific element (termination) and
101 * emit the termination when it is found in the stream
102 * <i>NumberOfOccurences</i>th times.<br/>
103 * The input should be a stream of tokens of a given type that matches the type
104 * of the terminator parameter of this actor.
105 * </p>
106 * 
107 * <p>
108 * Input tokens are passed on as long as the termination element is found as
109 * many times as specified. The termination element is not passed on until the
110 * <i>NumberOfOccurences</i>th occurence. After the termination element is
111 * emitted, incoming tokens will raise an Exception.
112 * </p>
113 * 
114 * <p>
115 * This actor can be used in the following stream processing scenario. A stream,
116 * which has a termination token, is split and elements are processed in
117 * parallel and then merged non-deterministically again. In such a case, if the
118 * termination token is routed on only one branch, it can overcome other tokens
119 * at the merge. So the termination token should be routed on ALL branches and
120 * then after the merge this actor can help to wait for the last termination
121 * token and thus ensuring that the termination token is the very last token in
122 * the stream.
123 * </p>
124 * 
125 * <p>
126 * Note that for record types, you do not need to specify all fields within the
127 * terminator. E.g. if you have a stream with {name/string, value=int}, you can
128 * define the terminator, for instance, as {name="T"}.
129 * </p>
130 * 
131 * <p>
132 * If the flag <i>discardOthers</i> is set, the input tokens are NOT emitted on
133 * output. That is, there is only one output, the termination token on its
134 * <i>NumberOfOccurences</i>th occurence.
135 * </p>
136 * 
137 * <p>
138 * The actor outputs the input tokens.
139 * </p>
140 * 
141 * <p>
142 * This actor is not always producing a token for an input (namely for the
143 * termination tokens), thus, it cannot be used in SDF. Since you do not have
144 * parallel stream processing under SDF, this can hardly be a problem.
145 * </p>
146 * 
147 * @author Norbert Podhorszki
148 * @version $Id: SyncOnTerminator.java 24234 2010-05-06 05:21:26Z welker $
149 * @since Ptolemy II 6.0.2
150 */
151public class SyncOnTerminator extends TypedAtomicActor {
152        /**
153         * Construct an actor with the given container and name.
154         * 
155         * @param container
156         *            The container.
157         * @param name
158         *            The name of this actor.
159         * @exception IllegalActionException
160         *                If the actor cannot be contained by the proposed
161         *                container.
162         * @exception NameDuplicationException
163         *                If the container already has an actor with this name.
164         */
165        public SyncOnTerminator(CompositeEntity container, String name)
166                        throws NameDuplicationException, IllegalActionException {
167                super(container, name);
168
169                /*
170                 * Input ports and port parameters
171                 */
172
173                // input
174                input = new TypedIOPort(this, "input", true, false);
175                new Parameter(input, "_showName", BooleanToken.FALSE);
176
177                // The terminator element to wait for
178                terminator = new Parameter(this, "terminator");
179
180                // Number of occurences of the terminator element to wait for
181                numberOfOccurences = new Parameter(this, "numberOfOccurences",
182                                new IntToken(1));
183                numberOfOccurences.setTypeEquals(BaseType.INT);
184
185                // Flag to indicate to discarding all non-terminator inputs
186                discardOthers = new Parameter(this, "discardOthers", BooleanToken.FALSE);
187                discardOthers.setTypeEquals(BaseType.BOOLEAN);
188
189                /*
190                 * Output ports
191                 */
192
193                // file name
194                output = new TypedIOPort(this, "output", false, true);
195                new Parameter(output, "_showName", BooleanToken.FALSE);
196
197        }
198
199        /***********************************************************
200         * ports and parameters
201         */
202
203        /**
204         * Input token. The type should match the terminator's type.
205         */
206        public TypedIOPort input;
207
208        /**
209         * The terminator element to wait for.
210         * 
211         */
212        public Parameter terminator;
213
214        /**
215         * The number of occurences of the terminator in the stream. The terminator
216         * will be emitted only at the last occurence.
217         */
218        public Parameter numberOfOccurences;
219
220        /**
221         * A flag to indicate whether non-terminator tokens should be passed on or
222         * discarded. If set, only one token will be ever emitted, name the
223         * terminator at the time of its last occurence.
224         */
225        public Parameter discardOthers;
226
227        /**
228         * The output token, which is always the input token.
229         */
230        public TypedIOPort output;
231
232        /***********************************************************
233         * public methods
234         */
235
236        /**
237         * initialize() runs once before first exec
238         * 
239         * @exception IllegalActionException
240         *                If the parent class throws it.
241         */
242        public void initialize() throws IllegalActionException {
243                super.initialize();
244                _occured = 0;
245        }
246
247        /**
248         * Override the base class to determine which function is being specified.
249         * 
250         * @param attribute
251         *            The attribute that changed.
252         * @exception IllegalActionException
253         *                If the function is not recognized.
254         */
255        public void attributeChanged(Attribute attribute)
256                        throws IllegalActionException {
257                if (attribute == discardOthers) {
258                        _discard = ((BooleanToken) discardOthers.getToken()).booleanValue();
259                        if (isDebugging)
260                                log.debug("Changed attribute discardOthers to: " + _discard);
261                } else if (attribute == numberOfOccurences) {
262                        _occurence = ((IntToken) numberOfOccurences.getToken()).intValue();
263                        if (isDebugging)
264                                log.debug("Changed attribute numberOfOccurences to: "
265                                                + _occurence);
266                } else if (attribute == terminator) {
267                        _terminator = (Token) terminator.getToken();
268                } else {
269                        super.attributeChanged(attribute);
270                }
271        }
272
273        /**
274         * fire
275         * 
276         * @exception IllegalActionException
277         */
278        public void fire() throws IllegalActionException {
279                super.fire();
280
281                Token _input = input.get(0);
282
283                // error check: what if already terminated?
284                if (_occured >= _occurence) {
285                        throw new IllegalActionException(
286                                        this.getName()
287                                                        + " : A token has arrived on input after the last occurence of the terminator. Max occurence = "
288                                                        + _occurence + "; occured = " + _occured);
289
290                }
291
292                if (_isTerminator(_input)) {
293                        _occured++;
294                        if (_occured >= _occurence)
295                                output.send(0, _input); // emit the terminator finally
296
297                } else if (!_discard) {
298                        output.send(0, _input); // pass-on the non-terminator
299                }
300
301        }
302
303        /**
304         * Return the type constraints of this actor. The type constraints are (a)
305         * the input port type is at most the type of the terminator parameter's
306         * type; (b) in case of record types, the type of the output port equals to
307         * the type of the input port; (c) for all other types, the output port type
308         * is at least the terminator parameter's type;
309         * 
310         * This allows a terminator specification of {name="T"} and still passing
311         * {name=string, date=long,...} records through the actor and the type of
312         * the output port will be the longer record.
313         * 
314         * @return a list of Inequality.
315         */
316
317        public List typeConstraintList() {
318                List constraints = new LinkedList();
319
320                _type = terminator.getType();
321
322                // I. non-record types
323                if (!(_type instanceof RecordType)) {
324                        input.setTypeAtMost(_type);
325                        output.setTypeEquals(_type);
326                        if (isDebugging)
327                                log.debug("typeConstraintList(). Type = " + _type
328                                                + " input type = " + input.getType()
329                                                + " output type = " + output.getType());
330                        return constraints;
331                }
332
333                // II. record types
334
335                // ensure that input has the labels of the parameter record
336                // i.e. can be a subtype, with more labels than the parameter
337                input.setTypeAtMost(_type);
338
339                // Declare that output has all the fields of the input port
340                Inequality inequality = new Inequality(new FunctionTerm(), output
341                                .getTypeTerm());
342                constraints.add(inequality);
343
344                if (isDebugging)
345                        log.debug("typeConstraintList(). Type = " + _type
346                                        + " input type = " + input.getType() + " output type = "
347                                        + output.getType());
348                return constraints;
349        }
350
351        /**
352         * Check if the incoming token is the terminator. For basic types, the
353         * tokens should be equal by the Token.equals() method. For record types,
354         * the terminator's all labels should exist in input, and their values
355         * should be equal (i.e. input record fully contains the terminator).
356         */
357        private boolean _isTerminator(Token token) {
358                // all types except records
359                if (!(_type instanceof RecordType))
360                        return _terminator.equals(token);
361
362                // check records
363                Set terminatorLabelSet = ((RecordToken) _terminator).labelSet();
364                Set tokenLabelSet = ((RecordToken) token).labelSet();
365                Iterator iterator = terminatorLabelSet.iterator();
366
367                while (iterator.hasNext()) {
368                        String label = (String) iterator.next();
369
370                        // check that the label is in input
371                        if (!tokenLabelSet.contains(label))
372                                return false;
373
374                        // check the values
375                        Token token1 = ((RecordToken) _terminator).get(label);
376                        Token token2 = ((RecordToken) token).get(label);
377
378                        if (!token1.equals(token2))
379                                return false;
380                }
381                return true; // _terminator record is fully contained in input
382        }
383
384        private int _occurence; // the max # of occurence of terminator
385        private int _occured = 0; // the # of occurence of terminator so far
386        private boolean _discard; // discard all non-terminator?
387        private Token _terminator; // the terminator
388        private Type _type; // the type of the terminator token
389
390        private static final Log log = LogFactory.getLog(SyncOnTerminator.class
391                        .getName());
392        private static final boolean isDebugging = log.isDebugEnabled();
393
394        // /////////////////////////////////////////////////////////////////
395        // // inner classes ////
396        // This class implements a monotonic function of the input port
397        // types. The value of the function is the record type of the
398        // input record. To ensure that this function is monotonic, the
399        // value of the function is bottom if the type of the port with
400        // name "input" is bottom. Otherwise (it must be a record), the
401        // value of the function is that record type.
402
403        private class FunctionTerm extends MonotonicFunction {
404                // /////////////////////////////////////////////////////////////
405                // // public inner methods ////
406
407                /**
408                 * Return the function result.
409                 * 
410                 * @return A Type.
411                 */
412                public Object getValue() {
413                        Type inputType = input.getType();
414
415                        if (isDebugging)
416                                log.debug("FunctionTerm.getValue called. input type="
417                                                + inputType);
418
419                        if (!(inputType instanceof RecordType)) {
420                                return BaseType.UNKNOWN;
421                        }
422
423                        return (RecordType) inputType;
424
425                }
426
427                /**
428                 * Return all the InequalityTerms for all input ports in an array.
429                 * 
430                 * @return An array of InequalityTerm.
431                 */
432                public InequalityTerm[] getVariables() {
433                        InequalityTerm[] variables = new InequalityTerm[1];
434                        variables[0] = input.getTypeTerm();
435                        return variables;
436                }
437        }
438
439}