001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: jianwu $' 006 * '$Date: 2013-05-23 17:32:39 +0000 (Thu, 23 May 2013) $' 007 * '$Revision: 32079 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.actor.ssh; 031 032import java.util.Hashtable; 033import java.util.Iterator; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.kepler.ssh.ExecException; 038import org.kepler.ssh.ExecFactory; 039import org.kepler.ssh.ExecInterface; 040 041import ptolemy.actor.TypedAtomicActor; 042import ptolemy.actor.TypedIOPort; 043import ptolemy.actor.parameters.PortParameter; 044import ptolemy.data.ArrayToken; 045import ptolemy.data.BooleanToken; 046import ptolemy.data.StringToken; 047import ptolemy.data.Token; 048import ptolemy.data.expr.FileParameter; 049import ptolemy.data.expr.Parameter; 050import ptolemy.data.type.BaseType; 051import ptolemy.kernel.CompositeEntity; 052import ptolemy.kernel.util.IllegalActionException; 053import ptolemy.kernel.util.NameDuplicationException; 054 055////////////////////////////////////////////////////////////////////////// 056//// SshSession 057/** 058 * <p> 059 * Creates an ssh session to a remote host. If requested, the session will not 060 * be opened at first firing of this actor but postponed to the first actor that 061 * uses ssh (e.g.ExecuteCmd.) This actor is useful for three things:<br/> 062 * - to provide a private-key for public-key authentication. <br/> 063 * - to connect to a remote machine at a certain point of the workflow and thus 064 * ask for password (e.g. at the very beginning) not sometime during execution.<br/> 065 * - to forward local ports and/or remote ports. 066 * 067 * </p> 068 * <p> 069 * This actor uses the org.kepler.ssh package to have longlasting connections 070 * 071 * </p> 072 * <p> 073 * Its output is the input target, which can be used to identify the created 074 * SshSession object anywhere (all ssh related classes use a session factory to 075 * create/retrieve sessions based on the target name) and thus use the 076 * connection. 077 * 078 * </p> 079 * <p> 080 * If the host is empty string or equals "local", nothing happens within this 081 * actor. All related actors will use the Java Runtime for local execution 082 * instead of ssh. 083 * 084 * </p> 085 * <p> 086 * If the parameter <i>postpone</i> is true, the establishment of connection is 087 * postponed until the first remote operation to be executed. One of the main 088 * purpose of this actor is, however, to make the connection and thus ask for 089 * password at the beginning of the workflow. The default is false. 090 * 091 * </p> 092 * <p> 093 * If the parameter <i>closeAtEnd</i> is true, the session will be closed at the 094 * end of the workflow. If it is false, the session will be kept open. The 095 * latter is good for authentication to a host secured with one-time-password, 096 * so that all workflows can share the same connection. Be careful, however, as 097 * the underlyting ssh package has only one session (within Kepler) to a given 098 * user@host:port. If you have two workflows running at once connecting to the 099 * same host, and the session is closed at the end of one of the workflows, the 100 * other will likely experience a broken operation. Therefore, by default, this 101 * parameter is false. 102 * 103 * </p> 104 * <p> 105 * On the <i>failed</i> output port, the actor emits a BooleanToken indicating 106 * whether the connection opening failed. It emits 'true' only if the 107 * <i>postpone</i> flag is false and the connection failed. Otherwise it emits a 108 * 'false' token. This can be used to throw an exception or stop the workflow 109 * that cannot work without a connection, or to successively try out other 110 * hosts. 111 * 112 * </p> 113 * <p> 114 * Port forwarding is supported. The 'portforwarding' parameter should have the 115 * format "-L port:host:hostport -R port:host:hostport ...". Many local and/or 116 * remote forwarding specification can be given. 117 * </p> 118 * 119 * Reference: Ant version 1.6.2. 120 * 121 * @author Norbert Podhorszki 122 * @version $Revision: 32079 $ 123 * @category.name remote 124 * @category.name connection 125 * @category.name external execution 126 */ 127 128public class SshSession extends TypedAtomicActor { 129 130 /** 131 * Construct an SshSession actor with the given container and name. Create 132 * the parameters, initialize their values. 133 * 134 * @param container 135 * The container. 136 * @param name 137 * The name of this actor. 138 * @exception IllegalActionException 139 * If the entity cannot be contained by the proposed 140 * container. 141 * @exception NameDuplicationException 142 * If the container already has an actor with this name. 143 */ 144 public SshSession(CompositeEntity container, String name) 145 throws NameDuplicationException, IllegalActionException { 146 super(container, name); 147 148 // target selects the machine where to connect to 149 target = new PortParameter(this, "target", new StringToken( 150 "[user@]host[:port]")); 151 new Parameter(target.getPort(), "_showName", BooleanToken.TRUE); 152 target.setStringMode(true); 153 154 paramIdentity = new FileParameter(this, "identity"); 155 identity = new TypedIOPort(this, "identity", true, false); 156 identity.setTypeEquals(BaseType.STRING); 157 158 paramGridCert = new FileParameter(this, "gridcert"); 159 gridcert = new TypedIOPort(this, "gridcert", true, false); 160 gridcert.setTypeEquals(BaseType.STRING); 161 162 paramGridProxy = new FileParameter(this, "gridproxy"); 163 gridproxy = new TypedIOPort(this, "gridproxy", true, false); 164 gridproxy.setTypeEquals(BaseType.STRING); 165 166 // port forwarding specifications 167 portforwarding = new PortParameter(this, "portforwarding", 168 new ArrayToken(BaseType.STRING)); 169 new Parameter(portforwarding.getPort(), "_showName", BooleanToken.FALSE); 170 171 postpone = new Parameter(this, "postpone", new BooleanToken(false)); 172 postpone.setTypeEquals(BaseType.BOOLEAN); 173 174 closeAtEnd = new Parameter(this, "closeAtEnd", new BooleanToken(false)); 175 closeAtEnd.setTypeEquals(BaseType.BOOLEAN); 176 177 target_out = new TypedIOPort(this, "target_out", false, true); 178 target_out.setTypeEquals(BaseType.STRING); 179 180 failed = new TypedIOPort(this, "failed", false, true); 181 failed.setTypeEquals(BaseType.BOOLEAN); 182 183 // Set the type constraints. 184 185 _attachText("_iconDescription", "<svg>\n" + "<rect x=\"0\" y=\"0\" " 186 + "width=\"75\" height=\"50\" style=\"fill:gray\"/>\n" 187 + "<text x=\"5\" y=\"30\"" 188 + "style=\"font-size:25; fill:yellow; font-family:SansSerif\">" 189 + "SSH2</text>\n" + "</svg>\n"); 190 } 191 192 // //////////////// Public ports and parameters /////////////////////// 193 194 /** 195 * Target in user@host:port format. If user is not provided, the local 196 * username will be used. If port is not provided, the default port 22 will 197 * be applied. If target is "local" or empty string, nothing happens in this 198 * actor and all commands (in other ssh actors) will be executed locally, 199 * using Java Runtime. 200 */ 201 public PortParameter target; 202 203 /** 204 * The file path for the ssh identity file if the user wants to connect 205 * without having to enter the password all the time. 206 * 207 * The user can browse this file as it is a parameter. 208 */ 209 public FileParameter paramIdentity; 210 211 /** 212 * The string representation of the file path for the ssh identity file if 213 * the user wants to connect without having to enter the password all the 214 * time. 215 * 216 * This is the input option for the identity file. 217 */ 218 public TypedIOPort identity; 219 220 /** 221 * The file path for the Grid Certificate file if the user wants to connect 222 * to an GSI-SSH server without having to enter the password all the time 223 * (the passphrase for the certificate will be asked once). 224 * 225 * The user can browse this file as it is a parameter. 226 */ 227 public FileParameter paramGridCert; 228 229 /** 230 * The string representation of the file path for the Grid Certificate file if 231 * the user wants to connect to an GSI-SSH server without having to enter the 232 * password all the time (the passphrase for the certificate will be asked once). 233 * 234 * This is the input option for the Grid Certificate file. 235 */ 236 public TypedIOPort gridcert; 237 238 /** 239 * The file path for the Grid Certificate Proxy file if the user wants to connect 240 * to an GSI-SSH server without having to enter the password all the time. 241 * 242 * The user can browse this file as it is a parameter. 243 */ 244 public FileParameter paramGridProxy; 245 246 /** 247 * The string representation of the file path for the Grid Certificate Proxy file if 248 * the user wants to connect to an GSI-SSH server without having to enter the 249 * password all the time. 250 * 251 * This is the input option for the Grid Certificate Proxy file. 252 */ 253 public TypedIOPort gridproxy; 254 255 /** 256 * Port forwarding specification. Format: 257 * "-L port:host:hostport -R port:host:hostport ..." Many forwarding spec 258 * can be given. 259 */ 260 public PortParameter portforwarding; 261 262 /** 263 * String output: same as input target. It can be used for identifying the 264 * created session. ... so you do not need to wire actors together even, if 265 * you have the target as a parameter. 266 */ 267 public TypedIOPort target_out; 268 269 /** 270 * Boolean output to indicate whether the connection opening failed. It is 271 * true if and only if the <i>postpone</i> parameter is false and the 272 * connection to the target fails. 273 */ 274 public TypedIOPort failed; 275 276 /** 277 * Specifying whether actual connection to the host should be postponed 278 * until the first usage somewhere in the workflow. If false, connection is 279 * made here, which is good for password based authentications: at least you 280 * know when to expect the password dialog popping up. 281 */ 282 public Parameter postpone; 283 284 /** 285 * Specifying whether actual connection to the host should be closed when 286 * the workflow terminates. If you run more than one workflow at once using 287 * the same remote host, this flag should be false to avoid closing the 288 * session in one workflow while the others are still using it. 289 */ 290 public Parameter closeAtEnd; 291 292 // ///////////////////////////////////////////////////////////////// 293 // // public methods //// 294 295 /** 296 * initialize() runs once before first exec 297 * 298 * @exception IllegalActionException 299 * If the parent class throws it. 300 */ 301 public void initialize() throws IllegalActionException { 302 super.initialize(); 303 execObjectSet = new Hashtable<String,ExecInterface>(); 304 } 305 306 /** 307 * fire. Create a session and open the connection if not postponed. 308 * 309 * @exception IllegalActionException 310 * is thrown if the session cannot be opened. 311 */ 312 public void fire() throws IllegalActionException { 313 super.fire(); 314 315 // process inputs 316 target.update(); 317 StringToken tg = (StringToken) target.getToken(); 318 String strTarget = tg.stringValue(); 319 //back compatibility, remove the double quotes at the very beginning and at the very last. 320 strTarget = strTarget.replaceAll("^\"|\"$", ""); 321 322 portforwarding.update(); 323 Token t = portforwarding.getToken(); 324 ArrayToken pft = null; 325 if (t instanceof ArrayToken) 326 pft = (ArrayToken) t; 327 else if (t.getType() == BaseType.STRING) { 328 StringToken[] sta = { (StringToken) t }; 329 pft = new ArrayToken(sta); 330 } else { 331 pft = new ArrayToken(BaseType.STRING); // just an empty array 332 } 333 334 boolean bPostpone = ((BooleanToken) postpone.getToken()).booleanValue(); 335 boolean bCloseAtEnd = ((BooleanToken) closeAtEnd.getToken()) 336 .booleanValue(); 337 338 if (isDebugging) 339 log.debug("Create session to " + strTarget + ". postpone = " 340 + bPostpone + " closeAtEnd = " + bCloseAtEnd); 341 342 // process grid certificate files if given 343 String strCert = null; 344 int n = gridcert.numberOfSources(); 345 if (n > 0) { 346 // take from input port (last will be effective) 347 for (int i = 0; i < n; i++) { 348 if (gridcert.hasToken(i)) { 349 strCert = ((StringToken) gridcert.get(0)).stringValue(); 350 strCert = trimFileName(strCert); 351 } 352 } 353 } else { 354 // take from parameter 355 strCert = ((StringToken) paramGridCert.getToken()).stringValue(); 356 strCert = trimFileName(strCert); 357 } 358 if (strCert != null && strCert.length() > 0) { 359 log.debug("Use Grid Certificate: " + strCert); 360 System.setProperty("X509_USER_CERT", strCert); 361 } 362 363 364 // process grid proxy files if given 365 String strProxy = null; 366 n = gridproxy.numberOfSources(); 367 if (n > 0) { 368 // take from input port (last will be effective) 369 for (int i = 0; i < n; i++) { 370 if (gridproxy.hasToken(i)) { 371 strProxy = ((StringToken) gridproxy.get(0)).stringValue(); 372 strProxy = trimFileName(strProxy); 373 } 374 } 375 } else { 376 // take from parameter 377 strProxy = ((StringToken) paramGridProxy.getToken()).stringValue(); 378 strProxy = trimFileName(strProxy); 379 } 380 if (strProxy != null && strProxy.length() > 0) { 381 log.debug("Use Grid Proxy: " + strProxy); 382 System.setProperty("X509_USER_PROXY", strProxy); 383 } 384 385 386 // Get ExecInterface object 387 boolean connectionFailed = false; 388 389 try { 390 ExecInterface execObj = ExecFactory.getExecObject(strTarget); 391 392 // process ssh identity files if given 393 String strIdentity; 394 n = identity.numberOfSources(); 395 if (n > 0) { 396 for (int i = 0; i < n; i++) { 397 if (identity.hasToken(i)) { 398 strIdentity = ((StringToken) identity.get(0)).stringValue(); 399 strIdentity = trimFileName(strIdentity); 400 if (strIdentity != null && strIdentity.length() > 0) 401 execObj.addIdentity(strIdentity); 402 } 403 } 404 } else { 405 strIdentity = ((StringToken) paramIdentity.getToken()).stringValue(); 406 strIdentity = trimFileName(strIdentity); 407 408 if (strIdentity != null && strIdentity.length() > 0) 409 execObj.addIdentity(strIdentity); 410 } 411 412 // process port forwarding specs 413 for (int i = 0; i < pft.length(); i++) { 414 StringToken st = (StringToken) pft.getElement(i); 415 String fwd = st.stringValue().trim(); 416 try { 417 if (fwd.length() > 0) { 418 if (fwd.startsWith("-R")) { 419 String foo = fwd.substring(2).trim(); 420 execObj.setPortForwardingR(foo); 421 if (isDebugging) 422 log.debug("Remote port forwarding: " + foo); 423 } else if (fwd.startsWith("-L")) { 424 String foo = fwd.substring(2).trim(); 425 execObj.setPortForwardingL(foo); 426 if (isDebugging) 427 log.debug("Local port forwarding: " + foo); 428 } else { 429 log.error("Invalid forwarding request. Start with -L or -R : " + fwd); 430 } 431 } else { 432 // if (isDebugging) 433 // log.debug("fwd spec <empty>. skip."); 434 } 435 436 } catch (ExecException e) { 437 log.error("Port forwarding request failed: " + e); 438 throw new ExecException("Port forwarding request failed: " + e); 439 } 440 } 441 442 // if postpone is not requested, open the connection now 443 // (and ask for password if needed) 444 if (!bPostpone) { 445 if (isDebugging) 446 log.debug("Open connection right now to " + strTarget); 447 execObj.openConnection(); 448 } 449 450 /* add ssh object to hash table for closing at end */ 451 if (bCloseAtEnd) 452 execObjectSet.put(strTarget, execObj); 453 454 } catch (ExecException e) { 455 String errorMsg = "Establishing connection to " + strTarget + " failed: " + e; 456 log.error(errorMsg); 457 connectionFailed = true; 458 throw new IllegalActionException(this, errorMsg); 459 } 460 461 target_out.send(0, tg); 462 failed.send(0, new BooleanToken(connectionFailed)); 463 464 } // end-of-method fire() 465 466 /** 467 * Close all sessions. This method is invoked exactly once per execution of 468 * an application. None of the other action methods should be be invoked 469 * after it. 470 * 471 * @exception IllegalActionException 472 * Not thrown in this base class. 473 */ 474 public void wrapup() throws IllegalActionException { 475 if (isDebugging) 476 log.debug("wrapup begin"); 477 super.wrapup(); 478 closeAll(); 479 if (isDebugging) 480 log.debug("wrapup end"); 481 } 482 483 /** 484 * Close all sessions. This method is invoked exactly once per execution of 485 * an application. None of the other action methods should be be invoked 486 * after it. 487 * 488 * @exception IllegalActionException 489 * Not thrown in this base class. 490 */ 491 public void stop() { 492 if (isDebugging) 493 log.debug("stop begin"); 494 super.stop(); 495 closeAll(); 496 if (isDebugging) 497 log.debug("stop end"); 498 } 499 500 // ///////////////////////////////////////////////////////////////// 501 // // private methods //// 502 503 /** 504 * Close all sessions. 505 */ 506 private void closeAll() { 507 if (execObjectSet == null) 508 return; 509 Iterator keys = execObjectSet.keySet().iterator(); 510 while (keys.hasNext()) { 511 String target = (String) keys.next(); 512 ExecInterface execObj = (ExecInterface) execObjectSet.get(target); 513 execObj.closeConnection(); 514 if (isDebugging) 515 log.debug("Closed session to " + target); 516 } 517 execObjectSet = null; 518 } 519 520 /** 521 * Hack the "path in FileParameter" to a path string that can be handled. 522 */ 523 private String trimFileName(String path) { 524 if (path != null && path.length() > 0) { 525 // Hack the path because we can't deal with "file:" or "file://" 526 if (path.startsWith("file:")) { 527 path = path.substring(5); 528 529 if (path.startsWith("//")) { 530 path = path.substring(2); 531 } 532 } 533 } 534 return path; 535 } 536 537 /** 538 * Hash table for the execution objects, used only for closing their 539 * sessions at the end of the workflow. key: user@host:port as String value: 540 * the ExecInterface object 541 * 542 */ 543 private Hashtable<String,ExecInterface> execObjectSet; 544 545 private static final Log log = LogFactory 546 .getLog(SshSession.class.getName()); 547 private static final boolean isDebugging = log.isDebugEnabled(); 548 549} 550 551// vim: sw=4 ts=4 et