001/* 002 * Copyright (c) 2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: welker $' 006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 007 * '$Revision: 24234 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030/* CPES Actor for processing a stream of files and 031 bringing a specific element into the front. 032 Do not use in SDF! 033 */ 034/** 035 * '$RCSfile$' 036 * 037 * '$Author: welker $' 038 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 039 * '$Revision: 24234 $' 040 * 041 * For Details: http://www.kepler-project.org 042 * 043 * Copyright (c) 2004 The Regents of the University of California. 044 * All rights reserved. 045 * 046 * Permission is hereby granted, without written agreement and without 047 * license or royalty fees, to use, copy, modify, and distribute this 048 * software and its documentation for any purpose, provided that the 049 * above copyright notice and the following two paragraphs appear in 050 * all copies of this software. 051 * 052 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 053 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 054 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN 055 * IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY 056 * OF SUCH DAMAGE. 057 * 058 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 059 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 060 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 061 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY 062 * OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, 063 * UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 064 */ 065 066package org.sdm.spa; 067 068import java.util.ArrayList; 069import java.util.Iterator; 070 071import org.apache.commons.logging.Log; 072import org.apache.commons.logging.LogFactory; 073 074import ptolemy.actor.TypedAtomicActor; 075import ptolemy.actor.TypedIOPort; 076import ptolemy.data.BooleanToken; 077import ptolemy.data.RecordToken; 078import ptolemy.data.StringToken; 079import ptolemy.data.expr.Parameter; 080import ptolemy.data.type.BaseType; 081import ptolemy.data.type.RecordType; 082import ptolemy.data.type.Type; 083import ptolemy.kernel.CompositeEntity; 084import ptolemy.kernel.util.IllegalActionException; 085import ptolemy.kernel.util.NameDuplicationException; 086 087////////////////////////////////////////////////////////////////////////// 088//// DelayStreamToASpecificElement 089 090/** 091 * <p> 092 * Postpone a stream of files until a specific element is found and bring that 093 * element in front.<br/> 094 * The input should be a stream of tokens of file information: a record of 095 * {name=<filename>, size=<size in bytes>, date=<date in seconds>} Such tokens 096 * are produced by org.kepler.actor.io.SshDirectoryListing. 097 * </p> 098 * 099 * <p> 100 * Input files are gathered as long as such an element is found. The 101 * specificElement is emitted first and then all other files. This works only 102 * for the first time, after that any input is immediately emitted. 103 * </p> 104 * 105 * <p> 106 * For the case when there is no specific element in the stream, a stopping 107 * element can also be defined. If such element appears, this actor will emit 108 * all stored elements immediately and will not wait for the specific element 109 * any more. 110 * </p> 111 * 112 * <p> 113 * This actor is a CPES specific actor. When watching for .bp files of a 114 * simulation, the first set of files can have any order. For postprocessing, 115 * however, the mesh file should be transferred before processing any other 116 * files. So this actor brings the mesh file in front. It can be the case, 117 * however, that there are no .bp files at all. The stream always terminated by 118 * a final element (stopfile) which should be emitted anyways. 119 * </p> 120 * 121 * <p> 122 * The actor outputs the stream of files 123 * </p> 124 * 125 * <p> 126 * This actor does not produce any tokens for an unknown number of firings, then 127 * suddenly it produces several tokens. Thus, it cannot be used in SDF. 128 * </p> 129 * 130 * @author Norbert Podhorszki 131 * @version $Id: DelayStreamToASpecificElement.java 13512 2007-04-13 00:02:17Z 132 * podhorsz $ 133 * @since Ptolemy II 5.0.1 134 */ 135public class DelayStreamToASpecificElement extends TypedAtomicActor { 136 /** 137 * Construct an actor with the given container and name. 138 * 139 * @param container 140 * The container. 141 * @param name 142 * The name of this actor. 143 * @exception IllegalActionException 144 * If the actor cannot be contained by the proposed 145 * container. 146 * @exception NameDuplicationException 147 * If the container already has an actor with this name. 148 */ 149 public DelayStreamToASpecificElement(CompositeEntity container, String name) 150 throws NameDuplicationException, IllegalActionException { 151 super(container, name); 152 153 // File port type is a record: 154 String[] labels = { "name", "size", "date" }; 155 Type[] ctypes = { BaseType.STRING, BaseType.LONG, BaseType.LONG }; 156 _filetype = new RecordType(labels, ctypes); 157 158 /* 159 * Input ports and port parameters 160 */ 161 162 // file name 163 infile = new TypedIOPort(this, "infile", true, false); 164 infile.setTypeEquals(_filetype); 165 new Parameter(infile, "_showName", BooleanToken.TRUE); 166 167 // The specific element to wait for 168 specificElement = new Parameter(this, "specificElement", 169 new StringToken("pattern")); 170 specificElement.setTypeEquals(BaseType.STRING); 171 172 // The stopping element for the store 173 stopElement = new Parameter(this, "stopElement", new StringToken( 174 "pattern")); 175 stopElement.setTypeEquals(BaseType.STRING); 176 177 /* 178 * Output ports 179 */ 180 181 // file name 182 outfile = new TypedIOPort(this, "outfile", false, true); 183 outfile.setTypeEquals(_filetype); 184 new Parameter(outfile, "_showName", BooleanToken.FALSE); 185 186 } 187 188 /*********************************************************** 189 * ports and parameters 190 */ 191 192 /** 193 * File info record as outputted by org.kepler.actor.io.SshDirectoryList: 194 * {name=<filename>, size=<size in bytes>, date=<date in UTC 195 * seconds>} 196 */ 197 public TypedIOPort infile; 198 199 /** 200 * The string pattern of the specific element to wait for. Files are 201 * gathered as long as such an element is found. Then first the 202 * specificElement is emitted and then all other files. This works only for 203 * the first time, after that any input is immediately emitted. 204 */ 205 public Parameter specificElement; 206 207 /** 208 * The string pattern of the stopping element. Stored files are immediately 209 * emitted when such an element is found, even if there were no 210 * specificElement found. This works only once, after that any input is 211 * immediately emitted. 212 */ 213 public Parameter stopElement; 214 215 /** 216 * The output file info record. Same type as input file infor record. 217 */ 218 public TypedIOPort outfile; 219 220 /*********************************************************** 221 * public methods 222 */ 223 224 /** 225 * initialize() runs once before first exec 226 * 227 * @exception IllegalActionException 228 * If the parent class throws it. 229 */ 230 public void initialize() throws IllegalActionException { 231 super.initialize(); 232 _gatherMode = true; 233 _files = new ArrayList(); 234 } 235 236 /** 237 * fire 238 * 239 * @exception IllegalActionException 240 */ 241 public void fire() throws IllegalActionException { 242 super.fire(); 243 244 // get parameters 245 StringToken spec = (StringToken) specificElement.getToken(); 246 String specpattern = spec.stringValue(); 247 248 // get parameters 249 StringToken stop = (StringToken) stopElement.getToken(); 250 String stoppattern = stop.stringValue(); 251 252 // consume the tokens 253 RecordToken fileInfo = (RecordToken) infile.get(0); 254 String fn = null; 255 fn = ((StringToken) fileInfo.get("name")).stringValue(); 256 257 // if (isDebugging) log.debug("Element = " + fn + 258 // " specpattern = " + specpattern + 259 // " stoppattern = " + stoppattern); 260 261 if (_gatherMode) { 262 // still looking for the specific element 263 if (fn.matches(specpattern)) { 264 // found specific element: 265 // emit this token and all the stored ones 266 if (isDebugging) 267 log.debug("Send specific element " + fn); 268 outfile.send(0, fileInfo); 269 Iterator files = _files.iterator(); 270 while (files.hasNext()) { 271 if (isDebugging) 272 log.debug("Send stored element "); 273 outfile.send(0, (RecordToken) files.next()); 274 } 275 _gatherMode = false; 276 } else if (fn.matches(stoppattern)) { 277 // found stop element: 278 // emit all stored tokens and then the stop element 279 Iterator files = _files.iterator(); 280 while (files.hasNext()) { 281 if (isDebugging) 282 log.debug("Send stored element "); 283 outfile.send(0, (RecordToken) files.next()); 284 } 285 if (isDebugging) 286 log.debug("Send stop element " + fn); 287 outfile.send(0, fileInfo); 288 _gatherMode = false; 289 290 } else { 291 // just store this token for later emission 292 _files.add(fileInfo); 293 if (isDebugging) 294 log.debug("Store file " + fn); 295 } 296 } else { 297 // immediately emit this token 298 if (isDebugging) 299 log.debug("Pass on element " + fn); 300 outfile.send(0, fileInfo); 301 } 302 303 } 304 305 private Type _filetype; 306 307 private boolean _gatherMode; // wait for the specific element? 308 private ArrayList _files; // of file info RecordTokens 309 310 private static final Log log = LogFactory 311 .getLog(DelayStreamToASpecificElement.class.getName()); 312 private static final boolean isDebugging = log.isDebugEnabled(); 313 314}