001/* 002 * Copyright (c) 2009-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-08-24 22:47:39 +0000 (Mon, 24 Aug 2015) $' 007 * '$Revision: 33633 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.srb; 031 032import java.io.OutputStream; 033import java.net.URI; 034import java.util.Vector; 035 036import edu.sdsc.grid.io.FileFactory; 037import edu.sdsc.grid.io.GeneralFile; 038import edu.sdsc.grid.io.GeneralFileSystem; 039import edu.sdsc.grid.io.GeneralMetaData; 040import edu.sdsc.grid.io.GeneralRandomAccessFile; 041import edu.sdsc.grid.io.MetaDataCondition; 042import edu.sdsc.grid.io.MetaDataSelect; 043import edu.sdsc.grid.io.MetaDataSet; 044import ptolemy.actor.TypedAtomicActor; 045import ptolemy.actor.TypedIOPort; 046import ptolemy.data.ArrayToken; 047import ptolemy.data.BooleanToken; 048import ptolemy.data.StringToken; 049import ptolemy.data.Token; 050import ptolemy.data.expr.Parameter; 051import ptolemy.data.type.ArrayType; 052import ptolemy.data.type.BaseType; 053import ptolemy.kernel.CompositeEntity; 054import ptolemy.kernel.util.Attribute; 055import ptolemy.kernel.util.IllegalActionException; 056import ptolemy.kernel.util.NameDuplicationException; 057import ptolemy.util.MessageHandler; 058 059////////////////////////////////////////////////////////////////////////// 060//// DataGridTransfer 061/** <p>DataGridTransfer is a Kepler Actor which has a functionality similar to 062 * the SRB/IRODS commands, namely Sget, Sput, iget, and iput. 063 * DataGridTransfer copies one or more objects to/from a remote filesystem into 064 * the local file system. 065 * The following actor expects as input a reference to local or remote file systems support by the Jargon API. 066 * This reference connection is created from the source and destination URL values, 067 * Currently available filesystem URLs are, file:///myDir/myfile.txt, 068 * irods://username:password@myhost.org:1247/myDir/myfile.txt, 069 * srb://username.domain:password@myhost.org:5544/myDir/myfile.txt, or ftp and http urls. 070 * </p><p> 071 * Currently, the source and destination filesystems can not be changed once the workflow is running. 072 * The filepaths can be changed. 073 * </p><p> 074 * The file reference system is created with a unique user account and 075 * with this connection reference as input the DataGridTransfer actor is 076 * able to gain access to various files on the file systems. 077 * Once an alive DataGridTransfer file connection system has been 078 * established the actor gets the destination directory and the source files 079 * to establish the DataGridTransfer file path. 080 * If the DataGridTransfer destination directory doesn't exist, a new directory 081 * is created. Once the DataGridTransfer files path are determined, 082 * the files are copied from the source file space to the 083 * local drive. In case the above process of parallel copy fails, 084 * a streaming copy process is carried out with random access streams 085 * where the file is downloaded as a sequence of byte arrays. 086 * </p><p> 087 * There is a parameter to overwrite existing files. 088 * </p><p> 089 * <B>Actor Input:</B> Accepts a reference to the 090 * the files systems, and two arrays of URL file paths. 091 * </p><p> 092 * <B>Actor Output:</B> Outputs the destination file paths and an exit status. The exit status gives 093 * a message of "success" or appropriate error to indicate the status of file get process. 094 * 095 * </p><p>The following actor uses the Dice Research Jargon API provided. 096 * <A href="http://www.sdsc.edu/srb"><I>Further information on SRB</I> </A> 097 * <A href="http://www.irods.org"><I>Further information on IRODS</I> </A> 098 099 @author Lucas Gilbert, Efrat Jaeger 100 @category.name datagrid 101 @category.name transfer 102*/ 103 104public class DataGridTransfer extends TypedAtomicActor { 105 106 /** Construct a constant source with the given container and name. 107 * Create the <i>value</i> parameter, initialize its value to 108 * the default value of an IntToken with value 1. 109 * @param container The container. 110 * @param name The name of this actor. 111 * @exception IllegalActionException If the entity cannot be contained 112 * by the proposed container. 113 * @exception NameDuplicationException If the container already has an 114 * actor with this name. 115 */ 116 public DataGridTransfer( CompositeEntity container, String name ) 117 throws NameDuplicationException, IllegalActionException 118 { 119 super(container, name); 120 121 source = new TypedIOPort(this, "sourceURL", true, false); 122 source.setTypeEquals(new ArrayType(BaseType.STRING)); 123 new Attribute(source, "_showName"); 124 125 fetchedFiles = new TypedIOPort(this, "fetchedFiles", false, true); 126 fetchedFiles.setTypeEquals(new ArrayType(BaseType.STRING)); 127 new Attribute(fetchedFiles, "_showName"); 128 129 exitCode = new TypedIOPort(this, "exitCode", false, true); 130 exitCode.setTypeEquals(BaseType.STRING); 131 new Attribute(exitCode, "_showName"); 132 133 destination = new TypedIOPort(this, "destinationDirectoryURL", true, false); 134 destination.setTypeEquals(BaseType.STRING); 135 new Attribute(destination, "_showName"); 136 137// destinationURLParameter = new Parameter(this, "destinationURLParameter"); 138// destinationURLParameter.setTypeEquals(BaseType.STRING); 139 140 overwriteParameter = new Parameter(this, "overwrite"); 141 overwriteParameter.setTypeEquals(BaseType.BOOLEAN); 142 overwriteParameter.setToken(BooleanToken.FALSE); 143 144 _attachText("_iconDescription", "<svg>\n" 145 + "<rect x=\"-25\" y=\"-20\" " 146 + "width=\"50\" height=\"40\" " 147 + "style=\"fill:white\"/>\n" 148 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 149 + " 15,-10 15,10, -15,10\" " 150 + "style=\"fill:red\"/>\n" 151 + "<text x=\"-13.5\" y=\"7\" " 152 + "style=\"font-size:14\">\n" 153 + "DataGrid \n" 154 + "</text>\n" 155 + "<text x=\"-12\" y=\"19\"" 156 + "style=\"font-size:11; fill:black; font-family:SansSerif\">" 157 + "Transfer</text>\n" 158 + "</svg>\n"); 159 160 } 161 162 /** Paths to the files to fetch 163 */ 164 public TypedIOPort source; 165 166 167 /** Paths to the destination 168 */ 169 public TypedIOPort fetchedFiles; 170 171 /** Exit status of the operation. 172 */ 173 public TypedIOPort exitCode; 174 175 /** Where to put the files on the destination drive. 176 */ 177 public TypedIOPort destination; 178 179 /** Where to put the files on the destination drive. 180 */ 181// public Parameter destinationURLParameter; 182 183 /** Overwrite when file is copied 184 */ 185 public Parameter overwriteParameter; 186 187 188 189 /////////////////////////////////////////////////////////////////// 190 //// public methods //// 191 /** Transfer the file or directory, 192 * from the source URL to the destination URL. 193 * @exception IllegalActionException If it is thrown if the 194 * file cannot be accessed. 195 */ 196 public void fire() throws IllegalActionException 197 { 198 GeneralFile sourceFile; 199 GeneralFile destinationFile; 200 String _exitCode = ""; 201 String destinationURL = null; 202 MetaDataCondition conditions[] = { 203 MetaDataSet.newCondition( GeneralMetaData.FILE_NAME, 204 MetaDataCondition.EQUAL, "fake") 205 }; 206 MetaDataSelect selects[] = { 207 MetaDataSet.newSelection( GeneralMetaData.FILE_NAME ) 208 }; 209 210 211 try { 212 //TODO not sure why it is done this way 213 if (destination.getWidth() > 0) { 214 destinationURL = ((StringToken)destination.get(0)).stringValue(); 215// destinationURLParameter.setExpression(destinationURL); 216// destinationURL = ((StringToken)destinationURLParameter.getToken()).stringValue(); 217 } 218 219 //TODO a bit excessive? 220 // make sure there is an alive connection. Use the old ones if possible. 221 try { 222 if (destinationDir == null) 223 destinationDir = FileFactory.newFile(new URI(destinationURL)); 224 else 225 destinationDir.query(selects); 226 } catch (Exception ex) { // connection did not exist or was closed. 227 // try to get a new connection in case the previous one has terminated. 228 destinationDir = FileFactory.newFile(new URI(destinationURL)); 229 } 230 destinationDir.mkdirs(); 231 232 // Getting the source files list token and copying each file to the destination drive. 233 ArrayToken sourceFilesTokenArr = null; 234 try { 235 sourceFilesTokenArr = (ArrayToken) source.get(0); 236 } catch (Exception ex) { 237 _debug("filesToGet port is null."); 238 } 239 if (sourceFilesTokenArr != null) { 240 Token[] sourceFilesToken = sourceFilesTokenArr.arrayValue(); 241 Vector fetchedFilesVec = new Vector(); 242 243 String fileStr = ((StringToken) sourceFilesToken[0]).stringValue(); 244 //TODO a bit excessive? 245 // make sure there is an alive connection. Use the old ones if possible. 246 try { 247 if (sourceFileSystem == null) 248 sourceFileSystem = FileFactory.newFileSystem(new URI(fileStr)); 249 else 250 sourceFileSystem.query(conditions, selects, 1); 251 } catch (Exception ex) { // connection did not exist or was closed. 252 // try to get a new connection in case the previous one has terminated. 253 sourceFile = FileFactory.newFile(new URI(fileStr)); 254 sourceFileSystem = sourceFile.getFileSystem(); 255 } 256 257 for (int i=0; i<sourceFilesToken.length; i++) 258 { 259 // source file path. 260 fileStr = ((StringToken) sourceFilesToken[i]).stringValue(); 261 _debug("<FILE_TO_GET>" + fileStr + "<FILE_TO_GET>"); 262 263 sourceFile = FileFactory.newFile(sourceFileSystem, new URI(fileStr).getPath()); 264 if (sourceFile.exists()) { 265 _debug("<LOCAL_FILE_PATH>" + destinationDir + "/" + sourceFile.getName() + "</LOCAL_FILE_PATH>"); 266 267 // copying the source file to the destination drive. 268 destinationFile = FileFactory.newFile(destinationDir, sourceFile.getName()); 269 boolean overwrite = ((BooleanToken)overwriteParameter.getToken()).booleanValue(); 270 // Don't ask for confirmation in append mode, since there 271 // will be no loss of data. 272 if (destinationFile.exists() && !overwrite) { 273 // Query for overwrite. 274 // FIXME: This should be called in the event thread! 275 // There is a chance of deadlock since it is not. 276 if (!MessageHandler.yesNoQuestion( 277 "OK to overwrite " + destinationFile + "?")) { 278 throw new IllegalActionException(this, 279 "Please select another file name."); 280 } 281 } 282 try { 283 //overwrite should be checked above 284 sourceFile.copyTo( destinationFile, true ); 285 fetchedFilesVec.add(new StringToken(destinationFile.getAbsolutePath())); 286 } catch (Exception ex) { 287 // If the paralel copy fails try to do a stream copy. 288 System.out.println("Paralel get failed due to " + ex.getMessage()); 289 System.out.println("Trying Stream get."); 290 try { 291 if (destinationFile.exists()) { 292 destinationFile.delete(); 293 } 294 _streamGet(sourceFile, destinationFile); 295 // adding successfully fetched files output path to the fetched files array. 296 fetchedFilesVec.add(new StringToken(destinationFile.getAbsolutePath())); 297 298 } catch (Exception stex) { 299 stex.printStackTrace(); 300 // even if there is an execption output the successfully fetched files. 301 System.out.println("failed to copy file " + fileStr + 302 " to " + destinationDir + "/" + sourceFile.getName() + "."); 303 _exitCode += "unable to fetch file " + fileStr; 304 _exitCode += " to " + destinationDir + "/" + sourceFile.getName() + ".\n"; 305 } 306 } 307 } else { 308 System.out.println("file " + fileStr + " does not exist."); 309 _exitCode += "file " + fileStr + " does not exist.\n"; 310 } 311 } 312 313 Token[] fetchedFilesArr = new StringToken[fetchedFilesVec.size()]; 314 fetchedFilesVec.toArray(fetchedFilesArr); 315 // broadcast the array only if it's non-empty 316 if (fetchedFilesArr.length > 0) { 317 fetchedFiles.broadcast(new ArrayToken(fetchedFilesArr)); 318 } else { 319 _exitCode = "no files were fetched."; 320 } 321 if (_exitCode.equals("")) { 322 _exitCode = "success"; 323 } 324 exitCode.broadcast(new StringToken(_exitCode)); 325 } else { 326 // no more files to get. 327 _refire = false; 328 } 329 } catch (Exception ex) { 330 ex.printStackTrace(); 331 sourceFile = null; 332// fileSystem = SRBUtil.closeConnection(fileSystem); 333 throw new IllegalActionException(this, ex.getMessage()); 334 } 335 336 } 337 338 /** Initialize the source file system to null. 339 */ 340 public void initialize() throws IllegalActionException { 341 super.initialize(); 342 sourceFileSystem = null; 343 destinationDir = null; 344 } 345 346 /** Post fire the actor. Return false to indicated that the 347 * process has finished. If it returns true, the process will 348 * continue indefinitely. 349 */ 350 public boolean postfire() throws IllegalActionException { 351 if (_refire) { 352 return super.postfire(); 353 } else return _refire; 354 } 355 356 /** Reset the _refire variable and disconnect 357 */ 358 public void wrapup() { 359 _refire = true; 360// fileSystem = SRBUtil.closeConnection(fileSystem); 361 } 362 363 /////////////////////////////////////////////////////////////////// 364 //// private methods //// 365 366 /** Stream read the file. Use in case the parallel get fails. 367 */ 368 private void _streamGet(GeneralFile sourceFile, GeneralFile destinationFile) throws Exception { 369 370 GeneralRandomAccessFile randomAccessFile= null; 371 byte[] bytesRead = new byte[20000];; 372 int nBytesRead; 373 OutputStream out = FileFactory.newFileOutputStream( destinationFile ); 374 375 randomAccessFile = FileFactory.newRandomAccessFile( sourceFile, "r" ); 376 nBytesRead = randomAccessFile.read(bytesRead); 377 while (nBytesRead > 0) { 378 out.write(bytesRead); 379 nBytesRead = randomAccessFile.read(bytesRead); 380 } 381 out.close(); 382 } 383 384 /////////////////////////////////////////////////////////////////// 385 //// private members //// 386 387 /** Indicator whether the actor should fire again 388 */ 389 private boolean _refire = true; 390 391 /** Source file system reference. 392 */ 393 private GeneralFileSystem sourceFileSystem = null; 394 395 /** Source file system reference. 396 */ 397 private GeneralFile destinationDir = null; 398}