001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: jianwu $' 006 * '$Date: 2015-06-04 22:39:27 +0000 (Thu, 04 Jun 2015) $' 007 * '$Revision: 33461 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.sdm.spa.actors.transport; 031 032import java.io.ByteArrayOutputStream; 033import java.io.File; 034import java.io.FileReader; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.Iterator; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.apache.xerces.parsers.DOMParser; 042import org.kepler.ssh.ExecException; 043import org.sdm.spa.actors.transport.vo.ConnectionDetails; 044import org.w3c.dom.Document; 045import org.w3c.dom.Element; 046import org.w3c.dom.NamedNodeMap; 047import org.w3c.dom.Node; 048import org.w3c.dom.NodeList; 049 050import ptolemy.actor.TypedAtomicActor; 051import ptolemy.actor.TypedIOPort; 052import ptolemy.actor.parameters.PortParameter; 053import ptolemy.data.BooleanToken; 054import ptolemy.data.IntToken; 055import ptolemy.data.StringToken; 056import ptolemy.data.Token; 057import ptolemy.data.expr.Parameter; 058import ptolemy.data.expr.StringParameter; 059import ptolemy.data.type.BaseType; 060import ptolemy.kernel.CompositeEntity; 061import ptolemy.kernel.util.IllegalActionException; 062import ptolemy.kernel.util.NameDuplicationException; 063import ptolemy.kernel.util.Settable; 064 065/** 066 * Copy files between local and remote machine or between two remote machines, 067 * using scp, sftp, bbcp or srmlite protocol. The actor uses the SSH protocol to 068 * connect to remote hosts. If the host is an empty string or "local" or 069 * "localhost", the Java Runtime will be used for execution instead of SSH. 070 * Features such as overwriting of existing files,errors during partial copy of 071 * directories, error codes, error messages etc. depend on the specific protocol 072 * selected. User may overwrite some of the default settings by specifying 073 * additional command line options 074 * <p> 075 * For copying files between two remote machines, connects to the source remote 076 * host using Ssh protocol and executes the command on a terminal. The actor 077 * org.kepler.ssh.SshExec is used for this 078 * </p> 079 * <p> 080 * This actor uses the org.kepler.ssh package to have long lasting connections. 081 * </p> 082 * <p> 083 * If the <i>timeoutSeconds</i> is set greater than zero, the command will be 084 * timed out after the specified amount of time (in seconds). 085 * </p> 086 * <p> 087 * In case there is an ssh connection related error (or timeout) the <i>exitcode</i> 088 * will be -32767, <i>errors</i> will contain the error message, <i>stdout</i> 089 * and <i>stderr</i> will be empty string. 090 * </p> 091 * <p> 092 * To ensure fixed rate of token production for SDF, the actor emits an empty 093 * string on <i>errors</i> if the command is executed without ssh related 094 * errors. 095 * </p> 096 * <p> 097 * If <i>cleanupAfterError</i> is set, the remote process and its children will 098 * be killed (provided, we have the connection still alive). Very useful in case 099 * of timeout because that leaves remote processes running. Use only when 100 * connecting to a unix machine. In case of <i>local</i> or <i>localhost</i>, 101 * this flag is not used. 102 * </p> 103 * <p> 104 * Streaming of output during the command execution is not implemented. 105 * </p> 106 * 107 * @author Chandrika Sivaramakrishnan, Anand Kulkarni 108 * @version $Revision: 33461 $ 109 * @category.name remote 110 * @category.name connection 111 * @category.name external execution 112 */ 113 114// TODO: Check if source and dest machine are same - use cp instead of scp 115// Make main parameters as port parameters 116@SuppressWarnings("serial") 117public class GenericFileCopier extends TypedAtomicActor { 118 119 // /////////////////////////Private Static Variables//////////////////////// 120 private static final Log log = LogFactory.getLog(GenericFileCopier.class 121 .getName()); 122 private static final boolean isDebugging = log.isDebugEnabled(); 123 124 // private static ArrayList<MachineEntry> machineConfigurationsList = new 125 // ArrayList<MachineEntry>(); 126 private static HashMap<String, String> protocolPaths = new HashMap<String, String>(); 127 private static HashSet<String> hostSet = new HashSet<String>(); 128 private static HashSet<String> protocolsSet = new HashSet<String>(); 129 130 // ////////////////////// Private variables ////////////////////////////// 131 private ConnectionDetails srcDetails; 132 private ConnectionDetails destDetails; 133 134 // ///////////////// Public ports and parameters /////////////////////// 135 136 // /////////////Output ports///////////////////// 137 138 /** 139 * The exit code of the command. 140 */ 141 public TypedIOPort exitcode; 142 143 /** 144 * The string representation of all the errors that happened during the 145 * execution of the actor, if there are any. In case of remote execution 146 * this would contain the contents of standard output and standard error 147 * from the remote machine. In case of successful file transfer this is set 148 * to empty string 149 */ 150 public TypedIOPort errors; 151 /** 152 * The string representation of all the warnings that happened during the 153 * execution of the actor, if there are any. In case of remote execution 154 * this would contain the contents of standard output and standard warnings 155 * from the remote machine. 156 */ 157 public TypedIOPort warnings; 158 159 // /////////////////////////Input ports/parameters///////////////////// 160 161 /** 162 * source machine information in user@host:port format. 163 */ 164 public PortParameter source; 165 166 /** 167 * source file/directory to be copied. 168 */ 169 public PortParameter sourceFile; 170 171 /** 172 * destination machine information in user@host:port format. 173 */ 174 public PortParameter destination; 175 176 /** 177 * destination file/directory to which source should be copied. 178 */ 179 public PortParameter destinationFile; 180 181 /** 182 * Specifying whether directories can be copied recursively. 183 */ 184 public Parameter recursive; 185 186 /** 187 * Type of a protocol to be used to do file transfer. 188 */ 189 public StringParameter protocol; 190 191 // Expert parameters 192 193 /** 194 * Path where the protocol is installed in the source machine 195 */ 196 public Parameter protocolPathSrc; 197 198 /** 199 * Path where the protocol is installed in the destination machine 200 */ 201 public Parameter protocolPathDest; 202 203 /** 204 * Additional command line options to be used for the selected protocol. 205 */ 206 public Parameter cmdOptions; 207 208 /** 209 * Timeout in seconds for the command to be executed. 0 means waiting 210 * indefinitely for command termination. 211 */ 212 public Parameter timeoutSeconds; 213 214 /** 215 * Enforce killing remote process(es) after an error or timeout. Unix 216 * specific solution is used, therefore you should not set this flag if 217 * connecting to other servers. But it is very useful for unix as timeout 218 * leaves processes living there, and sometimes errors too. All processes 219 * belonging to the same group as the remote command (i.e. its children) 220 * will be killed. 221 */ 222 public Parameter cleanupAfterError; 223 224 /** 225 * Protocol that srmlite should internally use. 226 */ 227 public StringParameter srmProtocol; 228 229 230 public PortParameter connectFromDest; //Added by Chandrika 231 232 // ///////////////////////////////////////////////////////////////////// 233 // ///////////////////////Static block //////////////////////////////// 234 static { 235 // Parse xml file for parameter values 236 parseMachineConfigurationsFile(); 237 log.debug("Loaded protocol paths as :" + protocolPaths); 238 } 239 240 // ///////////////////////////////////////////////////////////////// 241 // // public methods //// 242 243 /** 244 * Construct an FileCopierBetweenRemoteMachines actor with the given 245 * container and name. Create the parameters, initialize their values. 246 * 247 * @param container 248 * The container. 249 * @param name 250 * The name of this actor. 251 * @exception IllegalActionException 252 * If the entity cannot be contained by the proposed 253 * container. 254 * @exception NameDuplicationException 255 * If the container already has an actor with this name. 256 */ 257 public GenericFileCopier(CompositeEntity container, String name) 258 throws NameDuplicationException, IllegalActionException { 259 super(container, name); 260 261 String defaultHostStr = "[user@]host[:port]"; 262 // sourceTarget selects the machine where to copy from 263 source = new PortParameter(this, "source machine"); 264 // sourceTarget.setTypeEquals(BaseType.STRING); 265 source.setToken(new StringToken(defaultHostStr)); 266 new Parameter(source.getPort(), "_showName", BooleanToken.TRUE); 267 268 // file or directory to be copied 269 sourceFile = new PortParameter(this, "source file", new StringToken()); 270 new Parameter(sourceFile.getPort(), "_showName", BooleanToken.TRUE); 271 272 // destinationTarget selects the machine where to copy to 273 destination = new PortParameter(this, "destination machine"); 274 destination.setToken(new StringToken(defaultHostStr)); 275 276 // file or directory to be copied 277 destinationFile = new PortParameter(this, "destination file", 278 new StringToken()); 279 new Parameter(destinationFile.getPort(), "_showName", BooleanToken.TRUE); 280 281 // recursive parameter 282 recursive = new Parameter(this, "recursive", new BooleanToken(false)); 283 recursive.setTypeEquals(BaseType.BOOLEAN); 284 285 protocol = new StringParameter(this, "protocol"); 286 287 protocolPathSrc = new Parameter(this, "protocol path on source"); 288 protocolPathSrc.setVisibility(Settable.EXPERT); 289 290 protocolPathDest = new Parameter(this, "protocol path on destination"); 291 protocolPathDest.setVisibility(Settable.EXPERT); 292 293 cmdOptions = new Parameter(this, "command line options"); 294 cmdOptions.setVisibility(Settable.EXPERT); 295 296 timeoutSeconds = new Parameter(this, "timeoutSeconds", new IntToken(0)); 297 timeoutSeconds.setTypeEquals(BaseType.INT); 298 299 cleanupAfterError = new Parameter(this, "cleanupAfterError", 300 new BooleanToken(false)); 301 cleanupAfterError.setTypeEquals(BaseType.BOOLEAN); 302 303 srmProtocol = new StringParameter(this, "srm protocol"); 304 srmProtocol.setVisibility(Settable.EXPERT); 305 306 //Added by Chandrika - Starts 307 connectFromDest = new PortParameter (this, "connect from destination to source",new BooleanToken(false)); 308 connectFromDest.setTypeEquals(BaseType.BOOLEAN); 309 //Added by Chandrika - Ends 310 // Output ports 311 exitcode = new TypedIOPort(this, "exitcode", false, true); 312 exitcode.setTypeEquals(BaseType.INT); 313 new Parameter(exitcode, "_showName", BooleanToken.TRUE); 314 315 errors = new TypedIOPort(this, "errors", false, true); 316 errors.setTypeEquals(BaseType.STRING); 317 new Parameter(errors, "_showName", BooleanToken.TRUE); 318 319 warnings = new TypedIOPort(this, "warnings", false, true); 320 warnings.setTypeEquals(BaseType.BOOLEAN); 321 new Parameter(warnings, "_showName", BooleanToken.TRUE); 322 323 // initialize parameters 324 initalizeParameters(); 325 326 // myEditor = new DynamicEditorFactory(this, "_editorFactory"); 327 328 _attachText("_iconDescription", "<svg>\n" + "<rect x=\"0\" y=\"0\" " 329 + "width=\"75\" height=\"50\" style=\"fill:blue\"/>\n" 330 + "<text x=\"5\" y=\"30\"" 331 + "style=\"font-size:14; fill:yellow; font-family:SansSerif\">" 332 + "SshExec</text>\n" + "</svg>\n"); 333 334 } 335 336 /** 337 * Send the token in the <i>value</i> parameter to the output. 338 * 339 * @exception IllegalActionException 340 * If it is thrown by the send() method sending out the 341 * token. 342 */ 343 public void fire() throws IllegalActionException { 344 345 super.fire(); 346 347 source.update(); 348 destination.update(); 349 sourceFile.update(); 350 destinationFile.update(); 351 connectFromDest.update(); 352 353 // process inputs 354 boolean recursiveVal = ((BooleanToken) recursive.getToken()).booleanValue(); 355 356 boolean connectFromDestVal = ((BooleanToken) connectFromDest.getToken()).booleanValue(); //Added by Chandrika 357 FileCopierBase.CopyResult result = null; 358 String srcFile = ""; 359 String destFile = ""; 360 String strSource = ""; 361 String strDestination = ""; 362 363 Token token = source.getToken(); 364 if(token != null){ 365 strSource = ((StringToken) token).stringValue().trim(); 366 } 367 token = destination.getToken(); 368 if(token != null){ 369 strDestination = ((StringToken) token).stringValue().trim(); 370 } 371 StringToken sourceFileToken = ((StringToken) sourceFile.getToken()); 372 StringToken destFileToken = (StringToken) destinationFile.getToken(); 373 374 // initialize 375 if(sourceFileToken!=null){ 376 srcFile = sourceFileToken.stringValue().trim(); 377 } 378 if(destFileToken!=null){ 379 destFile = destFileToken.stringValue().trim(); 380 } 381 log.debug("Src File-"+srcFile); 382 log.debug("Dest File-"+destFile); 383 384 // Do basic validation 385 if (srcFile.equals("")) { 386 exitcode.send(0, new IntToken(1)); 387 errors.send(0, new StringToken("No source file specified to copy")); 388 warnings.send(0, new BooleanToken(false)); 389 390 return; 391 } 392 if (destFile.equals("")) { 393 exitcode.send(0, new IntToken(1)); 394 errors.send(0, new StringToken("Please specify a target file/directory")); 395 warnings.send(0, new BooleanToken(false)); 396 return; 397 } 398 399 // Create instance of copier and pass on all the user inputs 400 FileCopierBase copier = createFileCopier(strSource, strDestination); 401 //Added by Chandrika - Start 402 //set source and destination details 403 destDetails.setConnectionOrigin(connectFromDestVal); 404 srcDetails.setConnectionOrigin(!connectFromDestVal); 405 //Added by Chandrika - Ends 406 int exitCode = 0; 407 ByteArrayOutputStream cmdStdout = new ByteArrayOutputStream(); 408 ByteArrayOutputStream cmdStderr = new ByteArrayOutputStream(); 409 log.debug("Copier -" + copier); 410 411 try { 412 413 result = copier.copy(srcDetails, srcFile, destDetails, destFile, 414 recursiveVal); 415 416 } catch (ExecException e) { 417 String errText = new String("ExecuteCmd error:\n" + e.getMessage()); 418 exitcode.send(0, new IntToken(-32767)); 419 errors.send(0, new StringToken(errText)); 420 warnings.send(0, new BooleanToken(false)); 421 return; 422 } 423 exitcode.send(0, new IntToken(result.getExitCode())); 424 if (result.getWarningMsg().length() > 0) { 425 warnings.send(0, new BooleanToken(true)); 426 errors.send(0, new StringToken(result.getErrorMsg() + " Warnings: " + result.getWarningMsg())); 427 } 428 else { 429 warnings.send(0, new BooleanToken(false)); 430 errors.send(0, new StringToken(result.getErrorMsg())); 431 } 432 } // end-method fire() 433 434 // ///////////////////////////////////////////////////////////////////////// 435 // ////// Private methods //////////// 436 437 // To parse xml file 438 private static void parseMachineConfigurationsFile() { 439 440 String keplerHome = System.getProperty("KEPLER"); 441 log.info("Kepler Home dir is "+keplerHome); 442 File configFile = new File(keplerHome 443 + "/common/configs/ptolemy/configs/spa/machineConfig.xml"); 444 if (configFile.exists()) { 445 log.info("GenericFileCopier:Loding config file: " + configFile); 446 try { 447 DOMParser parser = new DOMParser(); 448 org.xml.sax.InputSource iSource = new org.xml.sax.InputSource( 449 new FileReader(configFile)); 450 parser.parse(iSource); 451 Document doc = parser.getDocument(); 452 453 Element root = doc.getDocumentElement(); 454 NodeList list = root.getChildNodes(); 455 int length = list.getLength(); 456 boolean nonStdPaths = false; 457 for (int i = 0; i < length; i++) { 458 Node node = list.item(i); 459 if (node.getNodeType() == Node.ELEMENT_NODE) { 460 if ("machineEntry".equals(node.getNodeName())) { 461 nonStdPaths = false; 462 NamedNodeMap attributes = node.getAttributes(); 463 464 // Load into separate static variables 465 String user = attributes.getNamedItem("user") 466 .getNodeValue(); 467 String host = attributes.getNamedItem("host") 468 .getNodeValue(); 469 if (user == null || user.trim().equals("")) { 470 hostSet.add(host); 471 } else { 472 hostSet.add(user + "@" + host); 473 } 474 Node namedItem = attributes 475 .getNamedItem("protocolPaths"); 476 String paths[] = null; 477 if (namedItem != null) { 478 paths = namedItem.getNodeValue().split("\\|"); 479 nonStdPaths = true; 480 } 481 namedItem = attributes 482 .getNamedItem("transferProtocols"); 483 if (namedItem != null) { 484 String[] protocols = namedItem.getNodeValue() 485 .split("\\|"); 486 for (int j = 0; j < protocols.length; j++) { 487 protocolsSet.add(protocols[j]); 488 if (nonStdPaths && j < paths.length 489 && (!paths[j].equals(""))) { 490 // if non empty path is specified for 491 // this protocol add it Map 492 protocolPaths.put(protocols[j] + "@" 493 + host, paths[j]); 494 } 495 } // added all protocols for this machine/host 496 } 497 498 } 499 } 500 }// end of for loop 501 } catch (Exception e) { 502 log 503 .error( 504 "Exception reading machine configuration of GenericFileCopier", 505 e); 506 System.err 507 .println("Error loading machine configuration for GenericFileCopier actor"); 508 e.printStackTrace(System.err); 509 } 510 } else { 511 log.info("No machine configurations found for Remote FileCopier"); 512 } 513 514 } 515 516 // To load values into class parameters 517 private void initalizeParameters() { 518 519 Iterator<String> it = hostSet.iterator(); 520 while (it.hasNext()) { 521 String item = "\"" + (String) it.next() + "\""; 522 source.addChoice(item); 523 destination.addChoice(item); 524 } 525 526 it = protocolsSet.iterator(); 527 while (it.hasNext()) { 528 String item = (String) it.next(); 529 protocol.addChoice(item); 530 } 531 srmProtocol.addChoice("scp"); 532 srmProtocol.addChoice("sftp"); 533 srmProtocol.addChoice("gsiftp"); 534 } 535 536 // Create an instance of copier and set the necessary variables based on 537 // the user input 538 private FileCopierBase createFileCopier(String src, String dest) 539 throws IllegalActionException { 540 541 String strProtocol = ((StringToken) protocol.getToken()).stringValue() 542 .trim(); 543 // Parse connection string 544 srcDetails = parseConnectionString(src); 545 destDetails = parseConnectionString(dest); 546 547 548 if (strProtocol == null || strProtocol.trim().equals("")){ 549 // if either source or destination is a remote machines default to 550 // scp 551 if (!srcDetails.isLocal() || !destDetails.isLocal()) { 552 //Change by ANUSUA - Starts 553 // strProtocol = "scp";//----------Commented by ANUSUA 554 ScpCopier scpcopier = new ScpCopier(); 555 if(scpcopier.isScpPresent(srcDetails,destDetails)){ 556 strProtocol = "scp"; 557 log.debug("DEFAULT SCP"); 558 }else{ 559 strProtocol = "sftp"; 560 log.debug("DEFAULT SFTP"); 561 } 562 //Change by ANUSUA - Ends 563 } else { 564 strProtocol = ""; // set it to empty to get a local copier 565 } 566 } 567 568 569 FileCopierBase copier = FileCopierFactory.getFileCopier(strProtocol); 570 String strPathSrc; 571 String strPathDest; 572 Token temp = null; 573 574 copier.setCleanup(((BooleanToken) cleanupAfterError.getToken()) 575 .booleanValue()); 576 IntToken token = (IntToken) timeoutSeconds.getToken(); 577 if (token != null) { 578 copier.setTimeout(token.intValue()); 579 } 580 581 // set default path to protocol based on config file entry 582 copier.setProtocolPathSrc(protocolPaths.get(strProtocol + "@" 583 + srcDetails.getHost())); 584 copier.setProtocolPathDest(protocolPaths.get(strProtocol + "@" 585 + destDetails.getHost())); 586 587 // If on expert mode pass all the expert options to the copier 588 if (this.getAttribute("_expertMode") != null) { 589 590 if (isDebugging) { 591 log.debug("Expert mode"); 592 } 593 temp = cmdOptions.getToken(); 594 if (temp != null) { 595 copier.setCmdLineOptions(((StringToken) temp).stringValue()); 596 } 597 598 temp = protocolPathSrc.getToken(); 599 if (temp != null) { 600 strPathSrc = ((StringToken) temp).stringValue().trim(); 601 copier.setProtocolPathSrc(strPathSrc); 602 log.debug("protocol path src set=" + strPathSrc); 603 } 604 605 temp = protocolPathDest.getToken(); 606 if (temp != null) { 607 strPathDest = ((StringToken) temp).stringValue().trim(); 608 copier.setProtocolPathDest(strPathDest); 609 } 610 611 if (copier instanceof SrmliteCopier) { 612 temp = srmProtocol.getToken(); 613 if (temp != null) { 614 ((SrmliteCopier) copier) 615 .setSrmProtocol(((StringToken) temp).stringValue() 616 .trim()); 617 } 618 } 619 } else { 620 if (isDebugging) { 621 log.debug("Normal mode"); 622 } 623 } 624 return copier; 625 } 626 627 private ConnectionDetails parseConnectionString(String connectStr) 628 throws IllegalActionException { 629 630 int port = -1; 631 String host = null; 632 String user = null; 633 ConnectionDetails conn = new ConnectionDetails(); 634 635 // boolean relativePath = false; 636 connectStr = connectStr.trim(); 637 638 if (connectStr.equals("") || connectStr.equals("localhost")) { 639 user = System.getProperty("user.name"); 640 host = "localhost"; 641 } else { 642 // get USER 643 int atPos = connectStr.indexOf('@'); 644 if (atPos >= 0) 645 user = connectStr.substring(0, atPos); 646 647 // get HOST 648 int colonPos = connectStr.indexOf(':'); 649 if (colonPos >= 0) { 650 if (atPos >= 0) { 651 host = connectStr.substring(atPos + 1, colonPos); 652 } else { 653 host = connectStr.substring(0, colonPos); 654 } 655 String portStr = connectStr.substring(colonPos + 1); 656 try { 657 port = Integer.parseInt(portStr); 658 } catch (java.lang.NumberFormatException ex) { 659 throw new IllegalActionException( 660 "The port should be a number or omitted in source path " 661 + connectStr); 662 } 663 } else { 664 if (atPos >= 0) { 665 host = connectStr.substring(atPos + 1); 666 } else { 667 host = connectStr; 668 } 669 } 670 } 671 conn.setUser(user); 672 conn.setHost(host); 673 conn.setPort(port); 674 return conn; 675 } 676 677}