001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-08-24 22:47:39 +0000 (Mon, 24 Aug 2015) $' 007 * '$Revision: 33633 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.srb; 031 032import java.io.FileInputStream; 033import java.io.InputStream; 034import java.net.URLDecoder; 035import java.util.Vector; 036 037import edu.sdsc.grid.io.local.LocalFile; 038import edu.sdsc.grid.io.srb.SRBFile; 039import edu.sdsc.grid.io.srb.SRBFileSystem; 040import edu.sdsc.grid.io.srb.SRBRandomAccessFile; 041import ptolemy.actor.NoTokenException; 042import ptolemy.actor.TypedAtomicActor; 043import ptolemy.actor.TypedIOPort; 044import ptolemy.actor.parameters.PortParameter; 045import ptolemy.data.ArrayToken; 046import ptolemy.data.BooleanToken; 047import ptolemy.data.ObjectToken; 048import ptolemy.data.StringToken; 049import ptolemy.data.Token; 050import ptolemy.data.expr.Parameter; 051import ptolemy.data.type.ArrayType; 052import ptolemy.data.type.BaseType; 053import ptolemy.kernel.CompositeEntity; 054import ptolemy.kernel.attributes.URIAttribute; 055import ptolemy.kernel.util.Attribute; 056import ptolemy.kernel.util.IllegalActionException; 057import ptolemy.kernel.util.NameDuplicationException; 058import ptolemy.kernel.util.NamedObj; 059import ptolemy.util.MessageHandler; 060 061////////////////////////////////////////////////////////////////////////// 062//// SPut 063/** 064 * <p> 065 * SPut is a Kepler Actor which has a functionality similar to the SRB command 066 * namely "Sput". Sput imports one or more local files and/or directories into 067 * SRB space. SPut actor uploads a local file to the SRB. The following actor 068 * expects as input a reference to the SRB file system. This reference 069 * connection is obtained via the SRBConnect Actor in Kepler. <i>See SRBConnect 070 * & its documentation.</i> 071 * </p> 072 * <p> 073 * The file reference system is created with a unique SRB user account and with 074 * this connection reference as input the SPut actor is able to gain access to 075 * the SRB file space. Once an alive SRB file connection system has been 076 * established the actor gets the remode SRB directory specified and a remote 077 * file path is created. The local file(s) is then copied to the SRB space with 078 * the established file path. If the SRB file path is not specified, upload to 079 * the current working directory. In case the above parallel put process fails , 080 * a streaming put process is carried out with SRBRandomAccess streams where the 081 * file is uploaded as a sequence of byte arrays instead. 082 * </p> 083 * <p> 084 * The user is also asked for confirmation on overwriting existing SRB remote 085 * files if they exist or simply appending them. 086 * </p> 087 * <p> 088 * <B>Actor Input:</B> Accepts a reference to the SRB files system, an SRB 089 * remote location and an aray of local file paths. 090 * </p> 091 * <p> 092 * <B>Actor Output:</B> Outputs the remote file paths and an exit status.The 093 * exit status gives a message of "success" or appropriate error to indicate the 094 * status of file put process. 095 * 096 * </p> 097 * <p> 098 * The following actor accesses SRB file reference system and SRB file space 099 * with the SRB Jargon API provided. The JARGON is a pure API for developing 100 * programs with a data grid interface and I/O for SRB file systems. 101 * </p> 102 * <A href="http://www.sdsc.edu/srb"><I>Further information on SRB</I> </A> 103 * 104 * @author Efrat Jaeger 105 * @version $Id: SPut.java 33633 2015-08-24 22:47:39Z crawl $ 106 * @category.name srb 107 * @category.name put 108 */ 109 110public class SPut extends TypedAtomicActor { 111 112 /** 113 * Construct a constant source with the given container and name. Create the 114 * <i>value</i> parameter, initialize its value to the default value of an 115 * IntToken with value 1. 116 * 117 * @param container 118 * The container. 119 * @param name 120 * The name of this actor. 121 * @exception IllegalActionException 122 * If the entity cannot be contained by the proposed 123 * container. 124 * @exception NameDuplicationException 125 * If the container already has an actor with this name. 126 */ 127 public SPut(CompositeEntity container, String name) 128 throws NameDuplicationException, IllegalActionException { 129 super(container, name); 130 131 SRBFileSystem = new TypedIOPort(this, "SRBFileSystem", true, false); 132 SRBFileSystem.setTypeEquals(BaseType.GENERAL); 133 new Attribute(SRBFileSystem, "_showName"); 134 135 filesToPut = new TypedIOPort(this, "filesToPut", true, false); 136 filesToPut.setTypeEquals(new ArrayType(BaseType.STRING)); 137 new Attribute(filesToPut, "_showName"); 138 139 uploadedFiles = new TypedIOPort(this, "uploadedFiles", false, true); 140 uploadedFiles.setTypeEquals(new ArrayType(BaseType.STRING)); 141 new Attribute(uploadedFiles, "_showName"); 142 143 exitCode = new TypedIOPort(this, "exitCode", false, true); 144 exitCode.setTypeEquals(BaseType.STRING); 145 new Attribute(exitCode, "_showName"); 146 147 remoteDir = new PortParameter(this, "remoteDir"); 148 new Attribute(remoteDir, "_showName"); 149 150 confirmOverwrite = new Parameter(this, "confirmOverwrite"); 151 confirmOverwrite.setTypeEquals(BaseType.BOOLEAN); 152 confirmOverwrite.setToken(BooleanToken.FALSE); 153 154 _attachText("_iconDescription", "<svg>\n" 155 + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" " 156 + "style=\"fill:white\"/>\n" 157 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 158 + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n" 159 + "<text x=\"-13.5\" y=\"7\" " + "style=\"font-size:14\">\n" 160 + "SRB</text>\n" + "<text x=\"-12\" y=\"19\"" 161 + "style=\"font-size:11; fill:black; font-family:SansSerif\">" 162 + "SPut</text>\n" + "</svg>\n"); 163 } 164 165 /** 166 * SRB file system reference. 167 */ 168 public TypedIOPort SRBFileSystem; 169 170 /** 171 * Paths to the remote location. 172 */ 173 public TypedIOPort uploadedFiles; 174 175 /** 176 * Paths to the files to upload 177 */ 178 public TypedIOPort filesToPut; 179 180 /** 181 * Exit status of the operation 182 */ 183 public TypedIOPort exitCode; 184 185 /** 186 * Where to upload the files. 187 */ 188 public PortParameter remoteDir; 189 190 /** 191 * Overwrite existing srb file 192 */ 193 public Parameter confirmOverwrite; 194 195 // ///////////////////////////////////////////////////////////////// 196 // // public methods //// 197 /** 198 * Upload the file to the SRB. If the SRB file path is not specified, upload 199 * to the current working directory. Output the current working directory. 200 * 201 * @exception IllegalActionException 202 * If it is thrown if the SRB file cannot be accessed or the 203 * current directory cannot be broadcasted. 204 */ 205 public void fire() throws IllegalActionException { 206 207 SRBFile srbFile; 208 LocalFile localFile; 209 String srbFilePath; 210 String _exitCode = ""; 211 212 try { 213 // make sure there is an alive connection. 214 try { 215 srbFileSystem.getHost(); 216 } catch (Exception ex) { // connection was closed. 217 srbFileSystem = null; 218 ObjectToken SRBConOT = null; 219 try { // try to get a new connection in case the previous one 220 // has terminated. 221 SRBConOT = (ObjectToken) SRBFileSystem.get(0); 222 } catch (NoTokenException ntex) { 223 } 224 if (SRBConOT != null) { 225 srbFileSystem = (SRBFileSystem) SRBConOT.getValue(); 226 } 227 } 228 if (srbFileSystem == null) { 229 throw new IllegalActionException(this, 230 "No SRB connection available in actor " 231 + this.getName() + "."); 232 } 233 234 // The srb remote directory to fetch the files. 235 remoteDir.update(); 236 String remoteDirStr = ((StringToken) remoteDir.getToken()) 237 .stringValue(); 238 SRBFile parent = new SRBFile(srbFileSystem, remoteDirStr); 239 240 // Getting the local files list token and copying each file to SRB. 241 ArrayToken localFilesTokenArr = null; 242 try { 243 localFilesTokenArr = (ArrayToken) filesToPut.get(0); 244 } catch (Exception ex) { 245 _debug("filesToGet port is null."); 246 } 247 if (localFilesTokenArr != null) { 248 Token[] localFilesToken = localFilesTokenArr.arrayValue(); 249 Vector uploadedFilesVec = new Vector(); 250 for (int i = 0; i < localFilesToken.length; i++) { 251 252 // local file path. 253 String localFileStr = ((StringToken) localFilesToken[i]) 254 .stringValue(); 255 System.out.println(localFileStr); 256 if (localFileStr.startsWith("file:/")) { 257 localFileStr = localFileStr.substring(6); 258 while (localFileStr.startsWith("/")) { 259 localFileStr = localFileStr.substring(1); 260 } 261 } 262 localFileStr = URLDecoder.decode(localFileStr); 263 // File tmp = new File(localFileStr); 264 // localFileStr = tmp.getAbsolutePath(); 265 _debug("<FILE_TO_PUT>" + localFileStr + "<FILE_TO_PUT>"); 266 int slashIndex = localFileStr.lastIndexOf('/'); 267 if (slashIndex == -1) { 268 slashIndex = localFileStr.lastIndexOf("\\"); 269 } 270 String tmpFileNameString = localFileStr; 271 if (slashIndex > -1) { 272 tmpFileNameString = localFileStr 273 .substring(slashIndex + 1); 274 } else { 275 URIAttribute modelURI = null; 276 NamedObj container = this; 277 while (container != null && modelURI == null) { 278 try { 279 modelURI = (URIAttribute) container 280 .getAttribute("_uri", 281 URIAttribute.class); 282 } catch (IllegalActionException ex) { 283 // An attribute was found with name "_uri", but 284 // it is not 285 // an instance of URIAttribute. Continue the 286 // search. 287 modelURI = null; 288 } 289 container = (NamedObj) container.getContainer(); 290 } 291 if (modelURI != null) { 292 String modelPath = modelURI.getURI().toURL() 293 .getPath(); 294 int ind = modelPath.lastIndexOf("/"); 295 localFileStr = modelPath.substring(0, ind + 1) 296 + localFileStr; 297 } 298 } 299 _debug("<FILE_NAME_STR>" + tmpFileNameString 300 + "</FILE_NAME_STR>"); 301 302 localFile = new LocalFile(localFileStr); 303 if (localFile.exists()) { 304 // setting the remote file path. 305 306 // copying the local file to SRB. 307 srbFile = new SRBFile(parent, tmpFileNameString); 308 boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite 309 .getToken()).booleanValue(); 310 if (srbFile.exists() && confirmOverwriteValue) { 311 // Query for overwrite. 312 // FIXME: This should be called in the event thread! 313 // There is a chance of deadlock since it is not. 314 if (!MessageHandler 315 .yesNoQuestion("OK to overwrite " 316 + localFile + "?")) { 317 srbFile = null; 318 srbFileSystem = SRBUtil 319 .closeConnection(srbFileSystem); 320 throw new IllegalActionException(this, 321 "Please select another file name."); 322 } 323 } 324 try { 325 srbFile.copyFrom(localFile, true); 326 uploadedFilesVec.add(new StringToken(srbFile 327 .getAbsolutePath())); 328 } catch (Exception ex) { 329 // If the paralel put fails try to do a stream put. 330 System.out.println("Parallel put failed due to " 331 + ex.getMessage()); 332 System.out.println("Trying stream put."); 333 try { 334 _streamPut(srbFile, localFile.getAbsolutePath()); 335 // adding the successfully uploaded files paths 336 // to the uploaded files array. 337 uploadedFilesVec.add(new StringToken(srbFile 338 .getAbsolutePath())); 339 } catch (Exception stex) { 340 stex.printStackTrace(); 341 // even if there is an exception output the 342 // successfully uploaded files. 343 System.out.println("failed to upload file " 344 + localFile.getAbsolutePath() + " to " 345 + parent.getAbsolutePath() + ".\n"); 346 _exitCode += "unable to upload file " 347 + localFile.getAbsolutePath(); 348 _exitCode += " to " + parent.getAbsolutePath() 349 + ".\n"; 350 } 351 } 352 } else { 353 System.out.println("file " 354 + localFile.getAbsolutePath() 355 + " does not exist."); 356 _exitCode += "file " + localFile.getAbsolutePath() 357 + " does not exist.\n"; 358 } 359 } 360 Token[] uploadedFilesArr = new Token[uploadedFilesVec.size()]; 361 uploadedFilesVec.toArray(uploadedFilesArr); 362 // broadcast the array only if it's non-empty. 363 if (uploadedFilesArr.length > 0) { 364 uploadedFiles.broadcast(new ArrayToken(uploadedFilesArr)); 365 } else { 366 _exitCode = "no files were uploaded."; 367 } 368 if (_exitCode.equals("")) { 369 _exitCode = "success"; 370 } 371 exitCode.broadcast(new StringToken(_exitCode)); 372 } 373 } catch (Exception ex) { 374 srbFile = null; 375 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 376 throw new IllegalActionException(this, ex.getMessage()); 377 } 378 379 } 380 381 /** 382 * Initialize the srb file system to null. 383 */ 384 public void initialize() throws IllegalActionException { 385 super.initialize(); 386 srbFileSystem = null; 387 } 388 389 /** 390 * Disconnect from SRB. 391 */ 392 public void wrapup() { 393 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 394 } 395 396 // ///////////////////////////////////////////////////////////////// 397 // // private methods //// 398 399 /** 400 * Stream put the file. Use in case the parallel put fails. 401 */ 402 private void _streamPut(SRBFile srbFile, String localFilePath) 403 throws Exception { 404 405 SRBRandomAccessFile srbRandomAccessFile = null; 406 byte[] bytesRead = new byte[20000]; 407 ; 408 int nBytesRead; 409 InputStream in = new FileInputStream(localFilePath); 410 srbRandomAccessFile = new SRBRandomAccessFile(srbFile, "rw"); 411 412 nBytesRead = in.read(bytesRead); 413 while (nBytesRead > 0) { 414 srbRandomAccessFile.write(bytesRead); 415 nBytesRead = in.read(bytesRead); 416 } 417 in.close(); 418 } 419 420 private SRBFileSystem srbFileSystem = null; 421}