001/*
002 * Copyright (c) 2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: welker $'
006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 
007 * '$Revision: 24234 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030/* CPES Actor for processing a stream of files and
031   bringing a specific element into the front.
032   Do not use in SDF!
033 */
034/**
035 *    '$RCSfile$'
036 *
037 *     '$Author: welker $'
038 *       '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $'
039 *   '$Revision: 24234 $'
040 *
041 *  For Details: http://www.kepler-project.org
042 *
043 * Copyright (c) 2004 The Regents of the University of California.
044 * All rights reserved.
045 *
046 * Permission is hereby granted, without written agreement and without
047 * license or royalty fees, to use, copy, modify, and distribute this
048 * software and its documentation for any purpose, provided that the
049 * above copyright notice and the following two paragraphs appear in
050 * all copies of this software.
051 *
052 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
053 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
054 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN
055 * IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY
056 * OF SUCH DAMAGE.
057 *
058 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
059 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
060 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
061 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY
062 * OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT,
063 * UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
064 */
065
066package org.sdm.spa;
067
068import java.util.ArrayList;
069import java.util.Iterator;
070
071import org.apache.commons.logging.Log;
072import org.apache.commons.logging.LogFactory;
073
074import ptolemy.actor.TypedAtomicActor;
075import ptolemy.actor.TypedIOPort;
076import ptolemy.data.BooleanToken;
077import ptolemy.data.RecordToken;
078import ptolemy.data.StringToken;
079import ptolemy.data.expr.Parameter;
080import ptolemy.data.type.BaseType;
081import ptolemy.data.type.RecordType;
082import ptolemy.data.type.Type;
083import ptolemy.kernel.CompositeEntity;
084import ptolemy.kernel.util.IllegalActionException;
085import ptolemy.kernel.util.NameDuplicationException;
086
087//////////////////////////////////////////////////////////////////////////
088//// DelayStreamToASpecificElement
089
090/**
091 * <p>
092 * Postpone a stream of files until a specific element is found and bring that
093 * element in front.<br/>
094 * The input should be a stream of tokens of file information: a record of
095 * {name=<filename>, size=<size in bytes>, date=<date in seconds>} Such tokens
096 * are produced by org.kepler.actor.io.SshDirectoryListing.
097 * </p>
098 * 
099 * <p>
100 * Input files are gathered as long as such an element is found. The
101 * specificElement is emitted first and then all other files. This works only
102 * for the first time, after that any input is immediately emitted.
103 * </p>
104 * 
105 * <p>
106 * For the case when there is no specific element in the stream, a stopping
107 * element can also be defined. If such element appears, this actor will emit
108 * all stored elements immediately and will not wait for the specific element
109 * any more.
110 * </p>
111 * 
112 * <p>
113 * This actor is a CPES specific actor. When watching for .bp files of a
114 * simulation, the first set of files can have any order. For postprocessing,
115 * however, the mesh file should be transferred before processing any other
116 * files. So this actor brings the mesh file in front. It can be the case,
117 * however, that there are no .bp files at all. The stream always terminated by
118 * a final element (stopfile) which should be emitted anyways.
119 * </p>
120 * 
121 * <p>
122 * The actor outputs the stream of files
123 * </p>
124 * 
125 * <p>
126 * This actor does not produce any tokens for an unknown number of firings, then
127 * suddenly it produces several tokens. Thus, it cannot be used in SDF.
128 * </p>
129 * 
130 * @author Norbert Podhorszki
131 * @version $Id: DelayStreamToASpecificElement.java 13512 2007-04-13 00:02:17Z
132 *          podhorsz $
133 * @since Ptolemy II 5.0.1
134 */
135public class DelayStreamToASpecificElement extends TypedAtomicActor {
136        /**
137         * Construct an actor with the given container and name.
138         * 
139         * @param container
140         *            The container.
141         * @param name
142         *            The name of this actor.
143         * @exception IllegalActionException
144         *                If the actor cannot be contained by the proposed
145         *                container.
146         * @exception NameDuplicationException
147         *                If the container already has an actor with this name.
148         */
149        public DelayStreamToASpecificElement(CompositeEntity container, String name)
150                        throws NameDuplicationException, IllegalActionException {
151                super(container, name);
152
153                // File port type is a record:
154                String[] labels = { "name", "size", "date" };
155                Type[] ctypes = { BaseType.STRING, BaseType.LONG, BaseType.LONG };
156                _filetype = new RecordType(labels, ctypes);
157
158                /*
159                 * Input ports and port parameters
160                 */
161
162                // file name
163                infile = new TypedIOPort(this, "infile", true, false);
164                infile.setTypeEquals(_filetype);
165                new Parameter(infile, "_showName", BooleanToken.TRUE);
166
167                // The specific element to wait for
168                specificElement = new Parameter(this, "specificElement",
169                                new StringToken("pattern"));
170                specificElement.setTypeEquals(BaseType.STRING);
171
172                // The stopping element for the store
173                stopElement = new Parameter(this, "stopElement", new StringToken(
174                                "pattern"));
175                stopElement.setTypeEquals(BaseType.STRING);
176
177                /*
178                 * Output ports
179                 */
180
181                // file name
182                outfile = new TypedIOPort(this, "outfile", false, true);
183                outfile.setTypeEquals(_filetype);
184                new Parameter(outfile, "_showName", BooleanToken.FALSE);
185
186        }
187
188        /***********************************************************
189         * ports and parameters
190         */
191
192        /**
193         * File info record as outputted by org.kepler.actor.io.SshDirectoryList:
194         * {name=&lt;filename&gt;, size=&lt;size in bytes&gt;, date=&lt;date in UTC
195         * seconds&gt;}
196         */
197        public TypedIOPort infile;
198
199        /**
200         * The string pattern of the specific element to wait for. Files are
201         * gathered as long as such an element is found. Then first the
202         * specificElement is emitted and then all other files. This works only for
203         * the first time, after that any input is immediately emitted.
204         */
205        public Parameter specificElement;
206
207        /**
208         * The string pattern of the stopping element. Stored files are immediately
209         * emitted when such an element is found, even if there were no
210         * specificElement found. This works only once, after that any input is
211         * immediately emitted.
212         */
213        public Parameter stopElement;
214
215        /**
216         * The output file info record. Same type as input file infor record.
217         */
218        public TypedIOPort outfile;
219
220        /***********************************************************
221         * public methods
222         */
223
224        /**
225         * initialize() runs once before first exec
226         * 
227         * @exception IllegalActionException
228         *                If the parent class throws it.
229         */
230        public void initialize() throws IllegalActionException {
231                super.initialize();
232                _gatherMode = true;
233                _files = new ArrayList();
234        }
235
236        /**
237         * fire
238         * 
239         * @exception IllegalActionException
240         */
241        public void fire() throws IllegalActionException {
242                super.fire();
243
244                // get parameters
245                StringToken spec = (StringToken) specificElement.getToken();
246                String specpattern = spec.stringValue();
247
248                // get parameters
249                StringToken stop = (StringToken) stopElement.getToken();
250                String stoppattern = stop.stringValue();
251
252                // consume the tokens
253                RecordToken fileInfo = (RecordToken) infile.get(0);
254                String fn = null;
255                fn = ((StringToken) fileInfo.get("name")).stringValue();
256
257                // if (isDebugging) log.debug("Element = " + fn +
258                // "  specpattern = " + specpattern +
259                // "  stoppattern = " + stoppattern);
260
261                if (_gatherMode) {
262                        // still looking for the specific element
263                        if (fn.matches(specpattern)) {
264                                // found specific element:
265                                // emit this token and all the stored ones
266                                if (isDebugging)
267                                        log.debug("Send specific element " + fn);
268                                outfile.send(0, fileInfo);
269                                Iterator files = _files.iterator();
270                                while (files.hasNext()) {
271                                        if (isDebugging)
272                                                log.debug("Send stored element ");
273                                        outfile.send(0, (RecordToken) files.next());
274                                }
275                                _gatherMode = false;
276                        } else if (fn.matches(stoppattern)) {
277                                // found stop element:
278                                // emit all stored tokens and then the stop element
279                                Iterator files = _files.iterator();
280                                while (files.hasNext()) {
281                                        if (isDebugging)
282                                                log.debug("Send stored element ");
283                                        outfile.send(0, (RecordToken) files.next());
284                                }
285                                if (isDebugging)
286                                        log.debug("Send stop element " + fn);
287                                outfile.send(0, fileInfo);
288                                _gatherMode = false;
289
290                        } else {
291                                // just store this token for later emission
292                                _files.add(fileInfo);
293                                if (isDebugging)
294                                        log.debug("Store file " + fn);
295                        }
296                } else {
297                        // immediately emit this token
298                        if (isDebugging)
299                                log.debug("Pass on element " + fn);
300                        outfile.send(0, fileInfo);
301                }
302
303        }
304
305        private Type _filetype;
306
307        private boolean _gatherMode; // wait for the specific element?
308        private ArrayList _files; // of file info RecordTokens
309
310        private static final Log log = LogFactory
311                        .getLog(DelayStreamToASpecificElement.class.getName());
312        private static final boolean isDebugging = log.isDebugEnabled();
313
314}