001/* 002 * Copyright (c) 2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: welker $' 006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 007 * '$Revision: 24234 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030/* CPES Actor for chopping up a stream of files into 031 archive lists of minimum size 032 Numbering in the file names can denote timesteps of 033 a simulation, and this actor keeps files of the same timestep 034 together. 035 Do not use in SDF! 036 */ 037/** 038 * '$RCSfile$' 039 * 040 * '$Author: welker $' 041 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 042 * '$Revision: 24234 $' 043 * 044 * For Details: http://www.kepler-project.org 045 * 046 * Copyright (c) 2004 The Regents of the University of California. 047 * All rights reserved. 048 * 049 * Permission is hereby granted, without written agreement and without 050 * license or royalty fees, to use, copy, modify, and distribute this 051 * software and its documentation for any purpose, provided that the 052 * above copyright notice and the following two paragraphs appear in 053 * all copies of this software. 054 * 055 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 056 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 057 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN 058 * IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY 059 * OF SUCH DAMAGE. 060 * 061 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 062 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 063 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 064 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY 065 * OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, 066 * UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 067 */ 068 069package org.sdm.spa; 070 071import java.util.ArrayList; 072import java.util.Iterator; 073 074import org.apache.commons.logging.Log; 075import org.apache.commons.logging.LogFactory; 076 077import ptolemy.actor.TypedAtomicActor; 078import ptolemy.actor.TypedIOPort; 079import ptolemy.data.BooleanToken; 080import ptolemy.data.IntToken; 081import ptolemy.data.LongToken; 082import ptolemy.data.RecordToken; 083import ptolemy.data.StringToken; 084import ptolemy.data.Token; 085import ptolemy.data.expr.Parameter; 086import ptolemy.data.type.BaseType; 087import ptolemy.data.type.RecordType; 088import ptolemy.data.type.Type; 089import ptolemy.kernel.CompositeEntity; 090import ptolemy.kernel.util.IllegalActionException; 091import ptolemy.kernel.util.NameDuplicationException; 092 093////////////////////////////////////////////////////////////////////////// 094//// ArchiveCounter 095 096/** 097 * <p> 098 * Chop up list of files to archive lists with a minimum size.<br/> 099 * The input should be a stream of tokens of file information: a record of 100 * {name=<filename>, size=<size in bytes>, date=<date in seconds>} Such tokens 101 * are produced by org.kepler.actor.io.SshDirectoryListing. 102 * </p> 103 * 104 * <p> 105 * Numbering (first number field) in the file names can denote timesteps of a 106 * simulation, and this actor keeps files of the same timestep together. If 107 * there is no number in the file names, then they are considered to be 108 * stand-alone steps. 109 * </p> 110 * 111 * <p> 112 * The actor outputs a list of files to be archived whenever the processed 113 * files' total size overcomes the specified minimum (and full set of timesteps 114 * are available). The output is a record of (a) string containing the list of 115 * files to be archived together (separated with \n), (b) the total size and (c) 116 * first and (d) last timestep included in this list: {list=<string>, 117 * size=<long>, firstTS=<int>, lastTS=<int>}. 118 * </p> 119 * 120 * <p> 121 * The actor outputs also the file info's of the files to be archived 122 * one-by-one, for checkpointing and logging purposes, but after such firing 123 * when outputs the list. 124 * </p> 125 * 126 * <p> 127 * This actor does not produce any tokens for an unknown number of firings, then 128 * suddenly it produces a token. Thus, it cannot be used in SDF. 129 * </p> 130 * 131 * <p> 132 * If the finish flag is set to true, the actor will emit the (last) list 133 * without considering its total size. The actual input file will not be 134 * considered and listed at all. Thus, the flag can be used to stop the 135 * counting, using a special file for this purpose. 136 * </p> 137 * 138 * <p> 139 * The unit of the specified archive minimum size is MB (1024*1024 bytes). 140 * </p> 141 * 142 * @author Norbert Podhorszki 143 * @version $Id: ArchiveCounter.java 24234 2010-05-06 05:21:26Z welker $ 144 * @since Ptolemy II 5.0.1 145 */ 146public class ArchiveCounter extends TypedAtomicActor { 147 /** 148 * Construct an actor with the given container and name. 149 * 150 * @param container 151 * The container. 152 * @param name 153 * The name of this actor. 154 * @exception IllegalActionException 155 * If the actor cannot be contained by the proposed 156 * container. 157 * @exception NameDuplicationException 158 * If the container already has an actor with this name. 159 */ 160 public ArchiveCounter(CompositeEntity container, String name) 161 throws NameDuplicationException, IllegalActionException { 162 super(container, name); 163 164 // File port type is a record: 165 String[] labels = { "name", "size", "date" }; 166 Type[] ctypes = { BaseType.STRING, BaseType.LONG, BaseType.LONG }; 167 _filetype = new RecordType(labels, ctypes); 168 169 // Archive list port type is a record: 170 // filename is the local filename, 171 // firstTS is the first timestep stored in this list 172 // lastTS is the last timestep stored in this list 173 labels = new String[] { "list", "size", "firstTS", "lastTS" }; 174 ctypes = new Type[] { BaseType.STRING, BaseType.LONG, BaseType.INT, 175 BaseType.INT }; 176 _listtype = new RecordType(labels, ctypes); 177 178 /* 179 * Input ports and port parameters 180 */ 181 182 // file name 183 file = new TypedIOPort(this, "file", true, false); 184 file.setTypeEquals(_filetype); 185 new Parameter(file, "_showName", BooleanToken.TRUE); 186 187 // finish counting (the current list will be emitted) 188 finish = new TypedIOPort(this, "finish", true, false); 189 finish.setTypeEquals(BaseType.BOOLEAN); 190 new Parameter(finish, "_showName", BooleanToken.TRUE); 191 192 // Minimum archive size in MBs 193 archMinSizeMB = new Parameter(this, "archMinSizeMB", new LongToken( 194 1000L)); 195 archMinSizeMB.setTypeEquals(BaseType.LONG); 196 197 /* 198 * Output ports 199 */ 200 201 // the output: the record of a string which contains the name of files 202 // to be archived (separated by \n) and the first and last 203 // timesteps involved 204 list = new TypedIOPort(this, "list", false, true); 205 list.setTypeEquals(_listtype); 206 new Parameter(list, "_showName", BooleanToken.TRUE); 207 208 } 209 210 /*********************************************************** 211 * ports and parameters 212 */ 213 214 /** 215 * File info record as outputted by org.kepler.actor.io.SshDirectoryList: 216 * {name=<filename>, size=<size in bytes>, date=<date in UTC 217 * seconds>} 218 */ 219 public TypedIOPort file; 220 221 /** 222 * Finish flag for counting. It must be false for all files to be considered 223 * in archiving. Use true flag e.g. to stop counting and emitting the last 224 * list, giving a 'fake' file for input for such firing. Type of the port: 225 * boolean 226 */ 227 public TypedIOPort finish; 228 229 /** 230 * The minimum size for an archive list given in MBs. Files are gathered as 231 * long as their total sum reaches the minimum, and they represent complete 232 * timesteps. 233 */ 234 public Parameter archMinSizeMB; 235 236 /** 237 * The output is record: a string containing the list of files to be 238 * archived together, and the first and last timesteps in the list. This 239 * port has a record type of {filename=<string>, firstTS=<int>, 240 * lastTS=<int>}. 241 */ 242 public TypedIOPort list; 243 244 /*********************************************************** 245 * public methods 246 */ 247 248 /** 249 * initialize() runs once before first exec 250 * 251 * @exception IllegalActionException 252 * If the parent class throws it. 253 */ 254 public void initialize() throws IllegalActionException { 255 super.initialize(); 256 _archSize = 0L; 257 _firstTimestep = -1; 258 _lastTimestep = -1; 259 _files = new ArrayList(); 260 } 261 262 /** 263 * fire 264 * 265 * @exception IllegalActionException 266 */ 267 public void fire() throws IllegalActionException { 268 super.fire(); 269 270 // get parameters 271 LongToken archToken = (LongToken) archMinSizeMB.getToken(); 272 long _archMinSize = archToken.longValue(); 273 _archMinSize = _archMinSize * 1024 * 1024; 274 275 // consume the tokens 276 RecordToken fileInfo = (RecordToken) file.get(0); 277 boolean bFinish = ((BooleanToken) finish.get(0)).booleanValue(); 278 279 // process file info if it is not about finish 280 String fn = null; 281 long fsize = 0L; 282 int timestep = -1; 283 284 if (!bFinish) { 285 fn = ((StringToken) fileInfo.get("name")).stringValue(); 286 fsize = ((LongToken) fileInfo.get("size")).longValue(); 287 timestep = getTimestepFromFilename(fn); 288 } 289 290 // if (isDebugging) 291 // log.debug("File = " + fn + ", size = " + fsize + ", ts = " + timestep 292 // + 293 // "\n_archSize = " + _archSize + ", firstTS = " + _firstTimestep + 294 // ", lastTS = " + _lastTimestep); 295 296 // check if we are asked to finish 297 // OR timestep changed and archive is already big enough 298 // OR timestep is irrelevant and we check only the size 299 boolean doarch = bFinish; 300 doarch = doarch 301 || (_archSize >= _archMinSize && (_lastTimestep < timestep || timestep == -1)); 302 303 // We have to avoid archiving empty lists 304 doarch = doarch && _files.size() > 0; 305 306 if (doarch) { 307 308 // we have an archive list to publish now 309 310 if (isDebugging) 311 log 312 .debug("New archive list. # of files = " 313 + _files.size() + ", size = " + _archSize 314 + ", first timestep = " + _firstTimestep 315 + ", last timestep = " + _lastTimestep); 316 317 String listStr = createListString(); 318 RecordToken rt = createRecordToken(listStr, _archSize, 319 _firstTimestep, _lastTimestep); 320 321 list.send(0, rt); // emit output token now 322 323 // restart counting 324 _files = new ArrayList(); 325 _archSize = 0L; 326 327 } 328 329 if (!bFinish) { 330 // we have to put the new file to the list of previous ones. 331 if (_files.size() == 0) 332 _firstTimestep = timestep; 333 _files.add(fn); 334 _archSize += fsize; 335 _lastTimestep = timestep; 336 } else { 337 // finish (restart status except timestep counting) 338 _files = new ArrayList(); 339 _archSize = 0L; 340 } 341 } 342 343 /** 344 * Get the first number from the string. We assume that there is an 345 * extension which should be excluded. E.g. xgc.mesh.h5 will result in 0 346 */ 347 private int getTimestepFromFilename(String fn) { 348 if (fn == null) 349 return -1; 350 if (!fn.matches(".*[0-9]\\..*")) 351 return 0; 352 353 int start = 0; 354 int end; 355 char c = fn.charAt(start); 356 // look for the first number 357 while (c < '0' || c > '9') { 358 start++; // we must find one! 359 c = fn.charAt(start); 360 } 361 362 end = start; 363 // go to the end of the number 364 while ('0' <= c && c <= '9') { 365 end++; 366 if (end < fn.length()) 367 c = fn.charAt(end); 368 else 369 break; 370 } 371 372 String tsStr; 373 if (end >= fn.length()) 374 tsStr = fn.substring(start); 375 else 376 tsStr = fn.substring(start, end); 377 int ts = Integer.parseInt(tsStr); 378 return ts; 379 } 380 381 /* 382 * Create string of \n-separated list of file names from the stored data 383 */ 384 private String createListString() { 385 StringBuffer sb = new StringBuffer(); 386 Iterator files = _files.iterator(); 387 while (files.hasNext()) { 388 sb.append(files.next() + "\n"); 389 } 390 return sb.toString(); 391 } 392 393 /* 394 * Create one RecordToken of format {name=String, firstTS=int, lastTS=int} 395 * from the inputs. 396 */ 397 private RecordToken createRecordToken(String listStr, long size, 398 int firstTS, int lastTS) { 399 String[] labels = { "list", "size", "firstTS", "lastTS" }; 400 Token[] values = new Token[4]; 401 values[0] = new StringToken(listStr); 402 values[1] = new LongToken(size); 403 values[2] = new IntToken(firstTS); 404 values[3] = new IntToken(lastTS); 405 RecordToken rt = null; 406 try { 407 rt = new RecordToken(labels, values); 408 } catch (IllegalActionException ex) { 409 log 410 .error("ArchiveCount: Error at creating a record token for the archive list of " 411 + "size = " 412 + size 413 + ", firstTS = " 414 + firstTS 415 + ", lastTS = " + lastTS + ", list:\n " + listStr); 416 } 417 return rt; 418 } 419 420 private Type _filetype; 421 private Type _listtype; 422 423 private long _archSize = 0L; 424 private int _firstTimestep = -1; 425 private int _lastTimestep = -1; 426 private ArrayList _files; // of file info RecordTokens 427 428 private static final Log log = LogFactory.getLog(ArchiveCounter.class 429 .getName()); 430 private static final boolean isDebugging = log.isDebugEnabled(); 431 432}