001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-08-24 22:47:39 +0000 (Mon, 24 Aug 2015) $' 007 * '$Revision: 33633 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.srb; 031 032import java.io.File; 033import java.io.FileOutputStream; 034import java.io.OutputStream; 035import java.net.URLDecoder; 036import java.util.Vector; 037 038import edu.sdsc.grid.io.local.LocalFile; 039import edu.sdsc.grid.io.srb.SRBFile; 040import edu.sdsc.grid.io.srb.SRBFileSystem; 041import edu.sdsc.grid.io.srb.SRBRandomAccessFile; 042import ptolemy.actor.NoTokenException; 043import ptolemy.actor.TypedAtomicActor; 044import ptolemy.actor.TypedIOPort; 045import ptolemy.data.ArrayToken; 046import ptolemy.data.BooleanToken; 047import ptolemy.data.ObjectToken; 048import ptolemy.data.StringToken; 049import ptolemy.data.Token; 050import ptolemy.data.expr.FileParameter; 051import ptolemy.data.expr.Parameter; 052import ptolemy.data.type.ArrayType; 053import ptolemy.data.type.BaseType; 054import ptolemy.kernel.CompositeEntity; 055import ptolemy.kernel.util.Attribute; 056import ptolemy.kernel.util.IllegalActionException; 057import ptolemy.kernel.util.NameDuplicationException; 058import ptolemy.util.MessageHandler; 059 060////////////////////////////////////////////////////////////////////////// 061//// SGet 062/** 063 * <p> 064 * SGet is a Kepler Actor which has a functionality similar to the SRB command 065 * namely "Sget". Sget exports one or more objects from SRB space into the local 066 * file system. SGet actor downloads an SRB file to the local drive. The 067 * following actor expects as input a reference to the SRB file system. This 068 * reference connection is obtained via the SRBConnect Actor in Kepler. <i>See 069 * SRBConnect and its documentation.</i> 070 * </p> 071 * <p> 072 * The file reference system is created with a unique SRB user account and with 073 * this connection reference as input the SGet actor is able to gain access to 074 * various files on the SRB file systems. Once an alive SRB file connection 075 * system has been established the actor gets the local directory and the files 076 * to establish the SRB file path. If the SRB directory doesn't exist, a new 077 * directory is created. Once the SRB files path are determined, the files are 078 * copied from the SRB file space to the local drive.In case the above process 079 * of parallel copy fails, a streaming copy process is carried out with 080 * SRBRandomAccess streams where the file is downloaded as a sequence of byte 081 * arrays. 082 * </p> 083 * <p> 084 * The user is also asked for confirmation on overwriting existing local files 085 * if they exist or simply appending them. 086 * </p> 087 * <p> 088 * <B>Actor Input:</B> Accepts a reference to the SRB files system, a local 089 * directory and an aray of SRB remote file paths. 090 * </p> 091 * <p> 092 * <B>Actor Output:</B> Outputs the local file paths and an exit status. The 093 * exit status gives a message of "success" or appropriate error to indicate the 094 * status of file get process. 095 * 096 * </p> 097 * <p> 098 * The following actor accesses SRB file reference system and SRB file space 099 * with the SRB Jargon API provided. The JARGON is a pure API for developing 100 * programs with a data grid interface and I/O for SRB file systems. 101 * </p> 102 * <A href="http://www.sdsc.edu/srb"><I>Further information on SRB</I> </A> 103 * 104 * @author Efrat Jaeger 105 * @version $Id: SGet.java 33633 2015-08-24 22:47:39Z crawl $ 106 * @category.name srb 107 * @category.name put 108 */ 109 110public class SGet extends TypedAtomicActor { 111 112 /** 113 * Construct a constant source with the given container and name. Create the 114 * <i>value</i> parameter, initialize its value to the default value of an 115 * IntToken with value 1. 116 * 117 * @param container 118 * The container. 119 * @param name 120 * The name of this actor. 121 * @exception IllegalActionException 122 * If the entity cannot be contained by the proposed 123 * container. 124 * @exception NameDuplicationException 125 * If the container already has an actor with this name. 126 */ 127 public SGet(CompositeEntity container, String name) 128 throws NameDuplicationException, IllegalActionException { 129 super(container, name); 130 131 SRBFileSystem = new TypedIOPort(this, "SRBFileSystem", true, false); 132 SRBFileSystem.setTypeEquals(BaseType.GENERAL); 133 new Attribute(SRBFileSystem, "_showName"); 134 135 filesToGet = new TypedIOPort(this, "filesToGet", true, false); 136 filesToGet.setTypeEquals(new ArrayType(BaseType.STRING)); 137 new Attribute(filesToGet, "_showName"); 138 139 fetchedFiles = new TypedIOPort(this, "fetchedFiles", false, true); 140 fetchedFiles.setTypeEquals(new ArrayType(BaseType.STRING)); 141 new Attribute(fetchedFiles, "_showName"); 142 143 exitCode = new TypedIOPort(this, "exitCode", false, true); 144 exitCode.setTypeEquals(BaseType.STRING); 145 new Attribute(exitCode, "_showName"); 146 147 localDir = new TypedIOPort(this, "localDir", true, false); 148 localDir.setTypeEquals(BaseType.STRING); 149 new Attribute(localDir, "_showName"); 150 151 localDirParameter = new FileParameter(this, "localDirParameter"); 152 localDirParameter.setDisplayName("local Dir"); 153 154 append = new Parameter(this, "append"); 155 append.setTypeEquals(BaseType.BOOLEAN); 156 append.setToken(BooleanToken.FALSE); 157 158 confirmOverwrite = new Parameter(this, "confirmOverwrite"); 159 confirmOverwrite.setTypeEquals(BaseType.BOOLEAN); 160 confirmOverwrite.setToken(BooleanToken.FALSE); 161 162 _attachText("_iconDescription", "<svg>\n" 163 + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" " 164 + "style=\"fill:white\"/>\n" 165 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 166 + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n" 167 + "<text x=\"-13.5\" y=\"7\" " + "style=\"font-size:14\">\n" 168 + "SRB \n" + "</text>\n" + "<text x=\"-12\" y=\"19\"" 169 + "style=\"font-size:11; fill:black; font-family:SansSerif\">" 170 + "SGet</text>\n" + "</svg>\n"); 171 172 } 173 174 /** 175 * SRB file system reference. 176 */ 177 public TypedIOPort SRBFileSystem; 178 179 /** 180 * Paths to the SRB files to fetch 181 */ 182 public TypedIOPort filesToGet; 183 184 /** 185 * Paths to the local location. 186 */ 187 public TypedIOPort fetchedFiles; 188 189 /** 190 * Exit status of the operation. 191 */ 192 public TypedIOPort exitCode; 193 194 /** 195 * Where to fetch the files on the local drive. 196 */ 197 public TypedIOPort localDir; 198 199 /** 200 * Where to fetch the files on the local drive. 201 */ 202 public FileParameter localDirParameter; 203 204 /** 205 * Overwrite when SRB file is copied to loalFile 206 */ 207 public Parameter confirmOverwrite; 208 209 /** 210 * Append when SRB file is copied to loalFile 211 */ 212 public Parameter append; 213 214 // ///////////////////////////////////////////////////////////////// 215 // // public methods //// 216 /** 217 * Upload the file to the SRB. If the SRB file path is not specified, upload 218 * to the current working directory. Output the current working directory. 219 * 220 * @exception IllegalActionException 221 * If it is thrown if the SRB file cannot be accessed or the 222 * current directory cannot be broadcasted. 223 */ 224 public void fire() throws IllegalActionException { 225 226 SRBFile srbFile; 227 LocalFile localFile; 228 String localFilePath; 229 String _exitCode = ""; 230 231 try { 232 // make sure there is an alive connection. 233 try { 234 srbFileSystem.getHost(); 235 } catch (Exception ex) { // connection was closed. 236 srbFileSystem = null; 237 ObjectToken SRBConOT = null; 238 try { // try to get a new connection in case the previous one 239 // has terminated. 240 SRBConOT = (ObjectToken) SRBFileSystem.get(0); 241 } catch (NoTokenException ntex) { 242 } 243 if (SRBConOT != null) { 244 srbFileSystem = (SRBFileSystem) SRBConOT.getValue(); 245 } 246 } 247 if (srbFileSystem != null) { 248 // The local directory to fetch the files. 249 if (localDir.getWidth() > 0) { 250 String localDirStr = ((StringToken) localDir.get(0)) 251 .stringValue(); 252 localDirParameter.setExpression(localDirStr); 253 } 254 255 String localDirStr = ((StringToken) localDirParameter 256 .getToken()).stringValue(); 257 258 if (localDirStr.startsWith("file:/")) { 259 localDirStr = localDirStr.substring(6); 260 while (localDirStr.startsWith("/")) { 261 localDirStr = localDirStr.substring(1); 262 } 263 } 264 265 localDirStr = URLDecoder.decode(localDirStr); 266 267 File dir = new File(localDirStr); 268 // if the directory doesn't exist, create it. 269 if (!dir.exists()) { 270 dir.mkdirs(); 271 } 272 273 // Getting the srb files list token and copying each file to the 274 // local drive. 275 ArrayToken srbFilesTokenArr = null; 276 try { 277 srbFilesTokenArr = (ArrayToken) filesToGet.get(0); 278 } catch (Exception ex) { 279 _debug("filesToGet port is null."); 280 } 281 if (srbFilesTokenArr != null) { 282 Token[] srbFilesToken = srbFilesTokenArr.arrayValue(); 283 Vector fetchedFilesVec = new Vector(); 284 for (int i = 0; i < srbFilesToken.length; i++) { 285 286 // srb file path. 287 String srbFileStr = ((StringToken) srbFilesToken[i]) 288 .stringValue(); 289 _debug("<FILE_TO_GET>" + srbFileStr + "<FILE_TO_GET>"); 290 291 srbFile = new SRBFile(srbFileSystem, srbFileStr); 292 if (srbFile.exists()) { 293 // setting the local file path. 294 int slashIndex = srbFileStr.lastIndexOf('/'); 295 if (slashIndex == -1) { 296 srbFileSystem = SRBUtil 297 .closeConnection(srbFileSystem); 298 throw new IllegalActionException( 299 "No absolute srb file path!"); 300 } 301 String tmpFileNameString = srbFileStr 302 .substring(slashIndex + 1); 303 304 localFilePath = dir.getAbsolutePath() + "/"; 305 localFilePath += tmpFileNameString; 306 307 _debug("<LOCAL_FILE_PATH>" + localFilePath 308 + "</LOCAL_FILE_PATH>"); 309 310 // copying the SRB file to the local drive. 311 localFile = new LocalFile(localFilePath); 312 313 boolean appendValue = ((BooleanToken) append 314 .getToken()).booleanValue(); 315 boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite 316 .getToken()).booleanValue(); 317 // Don't ask for confirmation in append mode, since 318 // there 319 // will be no loss of data. 320 if (localFile.exists() && !appendValue 321 && confirmOverwriteValue) { 322 // Query for overwrite. 323 // FIXME: This should be called in the event 324 // thread! 325 // There is a chance of deadlock since it is 326 // not. 327 if (!MessageHandler 328 .yesNoQuestion("OK to overwrite " 329 + localFile + "?")) { 330 srbFileSystem = SRBUtil 331 .closeConnection(srbFileSystem); 332 throw new IllegalActionException(this, 333 "Please select another file name."); 334 } 335 } 336 try { 337 srbFile.copyTo(localFile, !appendValue); 338 fetchedFilesVec.add(new StringToken(localFile 339 .getAbsolutePath())); 340 } catch (Exception ex) { 341 // If the paralel copy fails try to do a stream 342 // copy. 343 System.out.println("Paralel get failed due to " 344 + ex.getMessage()); 345 System.out.println("Trying Stream get."); 346 try { 347 if (localFile.exists()) { 348 localFile.delete(); 349 } 350 _streamGet(srbFile, localFile 351 .getAbsolutePath()); 352 // adding successfully fetched files output 353 // path to the fetched files array. 354 fetchedFilesVec.add(new StringToken( 355 localFile.getAbsolutePath())); 356 357 } catch (Exception stex) { 358 stex.printStackTrace(); 359 // even if there is an execption output the 360 // successfully fetched files. 361 System.out.println("failed to copy file " 362 + srbFileStr + " to " 363 + localFilePath + "."); 364 _exitCode += "unable to fetch file " 365 + srbFileStr; 366 _exitCode += " to " + localFilePath + ".\n"; 367 } 368 } 369 } else { 370 System.out.println("file " + srbFileStr 371 + " does not exist."); 372 _exitCode += "file " + srbFileStr 373 + " does not exist.\n"; 374 } 375 } 376 377 Token[] fetchedFilesArr = new StringToken[fetchedFilesVec 378 .size()]; 379 fetchedFilesVec.toArray(fetchedFilesArr); 380 // broadcast the array only if it's non-empty 381 if (fetchedFilesArr.length > 0) { 382 fetchedFiles.broadcast(new ArrayToken(fetchedFilesArr)); 383 } else { 384 _exitCode = "no files were fetched."; 385 } 386 if (_exitCode.equals("")) { 387 _exitCode = "success"; 388 } 389 exitCode.broadcast(new StringToken(_exitCode)); 390 } else { 391 // no more files to get. 392 _refire = false; 393 } 394 } else 395 throw new IllegalActionException(this, 396 "No SRB connection available in actor " 397 + this.getName() + "."); 398 } catch (Exception ex) { 399 ex.printStackTrace(); 400 srbFile = null; 401 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 402 throw new IllegalActionException(this, ex.getMessage()); 403 } 404 405 } 406 407 /** 408 * Initialize the srb file system to null. 409 */ 410 public void initialize() throws IllegalActionException { 411 super.initialize(); 412 srbFileSystem = null; 413 } 414 415 /** 416 * Post fire the actor. Return false to indicated that the process has 417 * finished. If it returns true, the process will continue indefinitely. 418 */ 419 public boolean postfire() throws IllegalActionException { 420 if (_refire) { 421 return super.postfire(); 422 } else 423 return _refire; 424 } 425 426 /** 427 * Reset the _refire variable and disconnect 428 */ 429 public void wrapup() { 430 _refire = true; 431 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 432 } 433 434 // ///////////////////////////////////////////////////////////////// 435 // // private methods //// 436 437 /** 438 * Stream read the file. Use in case the parallel get fails. 439 */ 440 private void _streamGet(SRBFile srbFile, String localFilePath) 441 throws Exception { 442 443 SRBRandomAccessFile srbRandomAccessFile = null; 444 byte[] bytesRead = new byte[20000]; 445 ; 446 int nBytesRead; 447 OutputStream out = new FileOutputStream(localFilePath); 448 449 srbRandomAccessFile = new SRBRandomAccessFile(srbFile, "r"); 450 nBytesRead = srbRandomAccessFile.read(bytesRead); 451 while (nBytesRead > 0) { 452 out.write(bytesRead); 453 nBytesRead = srbRandomAccessFile.read(bytesRead); 454 } 455 out.close(); 456 } 457 458 // ///////////////////////////////////////////////////////////////// 459 // // private members //// 460 461 /** 462 * Indicator whether the actor should fire again 463 */ 464 private boolean _refire = true; 465 466 /** 467 * SRB file system reference. 468 */ 469 private SRBFileSystem srbFileSystem = null; 470}