001/*
002 * Copyright (c) 2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: welker $'
006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 
007 * '$Revision: 24234 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030/* CPES Actor for chopping up a stream of files into 
031   archive lists of minimum size
032   Numbering in the file names can denote timesteps of
033   a simulation, and this actor keeps files of the same timestep
034   together.
035   Do not use in SDF!
036 */
037/**
038 *    '$RCSfile$'
039 *
040 *     '$Author: welker $'
041 *       '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $'
042 *   '$Revision: 24234 $'
043 *
044 *  For Details: http://www.kepler-project.org
045 *
046 * Copyright (c) 2004 The Regents of the University of California.
047 * All rights reserved.
048 *
049 * Permission is hereby granted, without written agreement and without
050 * license or royalty fees, to use, copy, modify, and distribute this
051 * software and its documentation for any purpose, provided that the
052 * above copyright notice and the following two paragraphs appear in
053 * all copies of this software.
054 *
055 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
056 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
057 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN
058 * IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY
059 * OF SUCH DAMAGE.
060 *
061 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
062 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
063 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
064 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY
065 * OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT,
066 * UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
067 */
068
069package org.sdm.spa;
070
071import java.util.ArrayList;
072import java.util.Iterator;
073
074import org.apache.commons.logging.Log;
075import org.apache.commons.logging.LogFactory;
076
077import ptolemy.actor.TypedAtomicActor;
078import ptolemy.actor.TypedIOPort;
079import ptolemy.data.BooleanToken;
080import ptolemy.data.IntToken;
081import ptolemy.data.LongToken;
082import ptolemy.data.RecordToken;
083import ptolemy.data.StringToken;
084import ptolemy.data.Token;
085import ptolemy.data.expr.Parameter;
086import ptolemy.data.type.BaseType;
087import ptolemy.data.type.RecordType;
088import ptolemy.data.type.Type;
089import ptolemy.kernel.CompositeEntity;
090import ptolemy.kernel.util.IllegalActionException;
091import ptolemy.kernel.util.NameDuplicationException;
092
093//////////////////////////////////////////////////////////////////////////
094//// ArchiveCounter
095
096/**
097 * <p>
098 * Chop up list of files to archive lists with a minimum size.<br/>
099 * The input should be a stream of tokens of file information: a record of
100 * {name=<filename>, size=<size in bytes>, date=<date in seconds>} Such tokens
101 * are produced by org.kepler.actor.io.SshDirectoryListing.
102 * </p>
103 * 
104 * <p>
105 * Numbering (first number field) in the file names can denote timesteps of a
106 * simulation, and this actor keeps files of the same timestep together. If
107 * there is no number in the file names, then they are considered to be
108 * stand-alone steps.
109 * </p>
110 * 
111 * <p>
112 * The actor outputs a list of files to be archived whenever the processed
113 * files' total size overcomes the specified minimum (and full set of timesteps
114 * are available). The output is a record of (a) string containing the list of
115 * files to be archived together (separated with \n), (b) the total size and (c)
116 * first and (d) last timestep included in this list: {list=&lt;string&gt;,
117 * size=&lt;long&gt;, firstTS=&lt;int&gt;, lastTS=&lt;int&gt;}.
118 * </p>
119 * 
120 * <p>
121 * The actor outputs also the file info's of the files to be archived
122 * one-by-one, for checkpointing and logging purposes, but after such firing
123 * when outputs the list.
124 * </p>
125 * 
126 * <p>
127 * This actor does not produce any tokens for an unknown number of firings, then
128 * suddenly it produces a token. Thus, it cannot be used in SDF.
129 * </p>
130 * 
131 * <p>
132 * If the finish flag is set to true, the actor will emit the (last) list
133 * without considering its total size. The actual input file will not be
134 * considered and listed at all. Thus, the flag can be used to stop the
135 * counting, using a special file for this purpose.
136 * </p>
137 * 
138 * <p>
139 * The unit of the specified archive minimum size is MB (1024*1024 bytes).
140 * </p>
141 * 
142 * @author Norbert Podhorszki
143 * @version $Id: ArchiveCounter.java 24234 2010-05-06 05:21:26Z welker $
144 * @since Ptolemy II 5.0.1
145 */
146public class ArchiveCounter extends TypedAtomicActor {
147        /**
148         * Construct an actor with the given container and name.
149         * 
150         * @param container
151         *            The container.
152         * @param name
153         *            The name of this actor.
154         * @exception IllegalActionException
155         *                If the actor cannot be contained by the proposed
156         *                container.
157         * @exception NameDuplicationException
158         *                If the container already has an actor with this name.
159         */
160        public ArchiveCounter(CompositeEntity container, String name)
161                        throws NameDuplicationException, IllegalActionException {
162                super(container, name);
163
164                // File port type is a record:
165                String[] labels = { "name", "size", "date" };
166                Type[] ctypes = { BaseType.STRING, BaseType.LONG, BaseType.LONG };
167                _filetype = new RecordType(labels, ctypes);
168
169                // Archive list port type is a record:
170                // filename is the local filename,
171                // firstTS is the first timestep stored in this list
172                // lastTS is the last timestep stored in this list
173                labels = new String[] { "list", "size", "firstTS", "lastTS" };
174                ctypes = new Type[] { BaseType.STRING, BaseType.LONG, BaseType.INT,
175                                BaseType.INT };
176                _listtype = new RecordType(labels, ctypes);
177
178                /*
179                 * Input ports and port parameters
180                 */
181
182                // file name
183                file = new TypedIOPort(this, "file", true, false);
184                file.setTypeEquals(_filetype);
185                new Parameter(file, "_showName", BooleanToken.TRUE);
186
187                // finish counting (the current list will be emitted)
188                finish = new TypedIOPort(this, "finish", true, false);
189                finish.setTypeEquals(BaseType.BOOLEAN);
190                new Parameter(finish, "_showName", BooleanToken.TRUE);
191
192                // Minimum archive size in MBs
193                archMinSizeMB = new Parameter(this, "archMinSizeMB", new LongToken(
194                                1000L));
195                archMinSizeMB.setTypeEquals(BaseType.LONG);
196
197                /*
198                 * Output ports
199                 */
200
201                // the output: the record of a string which contains the name of files
202                // to be archived (separated by \n) and the first and last
203                // timesteps involved
204                list = new TypedIOPort(this, "list", false, true);
205                list.setTypeEquals(_listtype);
206                new Parameter(list, "_showName", BooleanToken.TRUE);
207
208        }
209
210        /***********************************************************
211         * ports and parameters
212         */
213
214        /**
215         * File info record as outputted by org.kepler.actor.io.SshDirectoryList:
216         * {name=&lt;filename&gt;, size=&lt;size in bytes&gt;, date=&lt;date in UTC
217         * seconds&gt;}
218         */
219        public TypedIOPort file;
220
221        /**
222         * Finish flag for counting. It must be false for all files to be considered
223         * in archiving. Use true flag e.g. to stop counting and emitting the last
224         * list, giving a 'fake' file for input for such firing. Type of the port:
225         * boolean
226         */
227        public TypedIOPort finish;
228
229        /**
230         * The minimum size for an archive list given in MBs. Files are gathered as
231         * long as their total sum reaches the minimum, and they represent complete
232         * timesteps.
233         */
234        public Parameter archMinSizeMB;
235
236        /**
237         * The output is record: a string containing the list of files to be
238         * archived together, and the first and last timesteps in the list. This
239         * port has a record type of {filename=&lt;string&gt;, firstTS=&lt;int&gt;,
240         * lastTS=&lt;int&gt;}.
241         */
242        public TypedIOPort list;
243
244        /***********************************************************
245         * public methods
246         */
247
248        /**
249         * initialize() runs once before first exec
250         * 
251         * @exception IllegalActionException
252         *                If the parent class throws it.
253         */
254        public void initialize() throws IllegalActionException {
255                super.initialize();
256                _archSize = 0L;
257                _firstTimestep = -1;
258                _lastTimestep = -1;
259                _files = new ArrayList();
260        }
261
262        /**
263         * fire
264         * 
265         * @exception IllegalActionException
266         */
267        public void fire() throws IllegalActionException {
268                super.fire();
269
270                // get parameters
271                LongToken archToken = (LongToken) archMinSizeMB.getToken();
272                long _archMinSize = archToken.longValue();
273                _archMinSize = _archMinSize * 1024 * 1024;
274
275                // consume the tokens
276                RecordToken fileInfo = (RecordToken) file.get(0);
277                boolean bFinish = ((BooleanToken) finish.get(0)).booleanValue();
278
279                // process file info if it is not about finish
280                String fn = null;
281                long fsize = 0L;
282                int timestep = -1;
283
284                if (!bFinish) {
285                        fn = ((StringToken) fileInfo.get("name")).stringValue();
286                        fsize = ((LongToken) fileInfo.get("size")).longValue();
287                        timestep = getTimestepFromFilename(fn);
288                }
289
290                // if (isDebugging)
291                // log.debug("File = " + fn + ", size = " + fsize + ", ts = " + timestep
292                // +
293                // "\n_archSize = " + _archSize + ", firstTS = " + _firstTimestep +
294                // ", lastTS = " + _lastTimestep);
295
296                // check if we are asked to finish
297                // OR timestep changed and archive is already big enough
298                // OR timestep is irrelevant and we check only the size
299                boolean doarch = bFinish;
300                doarch = doarch
301                                || (_archSize >= _archMinSize && (_lastTimestep < timestep || timestep == -1));
302
303                // We have to avoid archiving empty lists
304                doarch = doarch && _files.size() > 0;
305
306                if (doarch) {
307
308                        // we have an archive list to publish now
309
310                        if (isDebugging)
311                                log
312                                                .debug("New archive list. # of files = "
313                                                                + _files.size() + ", size = " + _archSize
314                                                                + ", first timestep = " + _firstTimestep
315                                                                + ", last timestep = " + _lastTimestep);
316
317                        String listStr = createListString();
318                        RecordToken rt = createRecordToken(listStr, _archSize,
319                                        _firstTimestep, _lastTimestep);
320
321                        list.send(0, rt); // emit output token now
322
323                        // restart counting
324                        _files = new ArrayList();
325                        _archSize = 0L;
326
327                }
328
329                if (!bFinish) {
330                        // we have to put the new file to the list of previous ones.
331                        if (_files.size() == 0)
332                                _firstTimestep = timestep;
333                        _files.add(fn);
334                        _archSize += fsize;
335                        _lastTimestep = timestep;
336                } else {
337                        // finish (restart status except timestep counting)
338                        _files = new ArrayList();
339                        _archSize = 0L;
340                }
341        }
342
343        /**
344         * Get the first number from the string. We assume that there is an
345         * extension which should be excluded. E.g. xgc.mesh.h5 will result in 0
346         */
347        private int getTimestepFromFilename(String fn) {
348                if (fn == null)
349                        return -1;
350                if (!fn.matches(".*[0-9]\\..*"))
351                        return 0;
352
353                int start = 0;
354                int end;
355                char c = fn.charAt(start);
356                // look for the first number
357                while (c < '0' || c > '9') {
358                        start++; // we must find one!
359                        c = fn.charAt(start);
360                }
361
362                end = start;
363                // go to the end of the number
364                while ('0' <= c && c <= '9') {
365                        end++;
366                        if (end < fn.length())
367                                c = fn.charAt(end);
368                        else
369                                break;
370                }
371
372                String tsStr;
373                if (end >= fn.length())
374                        tsStr = fn.substring(start);
375                else
376                        tsStr = fn.substring(start, end);
377                int ts = Integer.parseInt(tsStr);
378                return ts;
379        }
380
381        /*
382         * Create string of \n-separated list of file names from the stored data
383         */
384        private String createListString() {
385                StringBuffer sb = new StringBuffer();
386                Iterator files = _files.iterator();
387                while (files.hasNext()) {
388                        sb.append(files.next() + "\n");
389                }
390                return sb.toString();
391        }
392
393        /*
394         * Create one RecordToken of format {name=String, firstTS=int, lastTS=int}
395         * from the inputs.
396         */
397        private RecordToken createRecordToken(String listStr, long size,
398                        int firstTS, int lastTS) {
399                String[] labels = { "list", "size", "firstTS", "lastTS" };
400                Token[] values = new Token[4];
401                values[0] = new StringToken(listStr);
402                values[1] = new LongToken(size);
403                values[2] = new IntToken(firstTS);
404                values[3] = new IntToken(lastTS);
405                RecordToken rt = null;
406                try {
407                        rt = new RecordToken(labels, values);
408                } catch (IllegalActionException ex) {
409                        log
410                                        .error("ArchiveCount: Error at creating a record token for the archive list of "
411                                                        + "size = "
412                                                        + size
413                                                        + ", firstTS = "
414                                                        + firstTS
415                                                        + ", lastTS = " + lastTS + ", list:\n " + listStr);
416                }
417                return rt;
418        }
419
420        private Type _filetype;
421        private Type _listtype;
422
423        private long _archSize = 0L;
424        private int _firstTimestep = -1;
425        private int _lastTimestep = -1;
426        private ArrayList _files; // of file info RecordTokens
427
428        private static final Log log = LogFactory.getLog(ArchiveCounter.class
429                        .getName());
430        private static final boolean isDebugging = log.isDebugEnabled();
431
432}