001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: jianwu $'
006 * '$Date: 2013-05-23 17:32:39 +0000 (Thu, 23 May 2013) $' 
007 * '$Revision: 32079 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.actor.ssh;
031
032import java.util.Hashtable;
033import java.util.Iterator;
034
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.kepler.ssh.ExecException;
038import org.kepler.ssh.ExecFactory;
039import org.kepler.ssh.ExecInterface;
040
041import ptolemy.actor.TypedAtomicActor;
042import ptolemy.actor.TypedIOPort;
043import ptolemy.actor.parameters.PortParameter;
044import ptolemy.data.ArrayToken;
045import ptolemy.data.BooleanToken;
046import ptolemy.data.StringToken;
047import ptolemy.data.Token;
048import ptolemy.data.expr.FileParameter;
049import ptolemy.data.expr.Parameter;
050import ptolemy.data.type.BaseType;
051import ptolemy.kernel.CompositeEntity;
052import ptolemy.kernel.util.IllegalActionException;
053import ptolemy.kernel.util.NameDuplicationException;
054
055//////////////////////////////////////////////////////////////////////////
056//// SshSession
057/**
058 * <p>
059 * Creates an ssh session to a remote host. If requested, the session will not
060 * be opened at first firing of this actor but postponed to the first actor that
061 * uses ssh (e.g.ExecuteCmd.) This actor is useful for three things:<br/>
062 * - to provide a private-key for public-key authentication. <br/>
063 * - to connect to a remote machine at a certain point of the workflow and thus
064 * ask for password (e.g. at the very beginning) not sometime during execution.<br/>
065 * - to forward local ports and/or remote ports.
066 * 
067 * </p>
068 * <p>
069 * This actor uses the org.kepler.ssh package to have longlasting connections
070 * 
071 * </p>
072 * <p>
073 * Its output is the input target, which can be used to identify the created
074 * SshSession object anywhere (all ssh related classes use a session factory to
075 * create/retrieve sessions based on the target name) and thus use the
076 * connection.
077 * 
078 * </p>
079 * <p>
080 * If the host is empty string or equals "local", nothing happens within this
081 * actor. All related actors will use the Java Runtime for local execution
082 * instead of ssh.
083 * 
084 * </p>
085 * <p>
086 * If the parameter <i>postpone</i> is true, the establishment of connection is
087 * postponed until the first remote operation to be executed. One of the main
088 * purpose of this actor is, however, to make the connection and thus ask for
089 * password at the beginning of the workflow. The default is false.
090 * 
091 * </p>
092 * <p>
093 * If the parameter <i>closeAtEnd</i> is true, the session will be closed at the
094 * end of the workflow. If it is false, the session will be kept open. The
095 * latter is good for authentication to a host secured with one-time-password,
096 * so that all workflows can share the same connection. Be careful, however, as
097 * the underlyting ssh package has only one session (within Kepler) to a given
098 * user@host:port. If you have two workflows running at once connecting to the
099 * same host, and the session is closed at the end of one of the workflows, the
100 * other will likely experience a broken operation. Therefore, by default, this
101 * parameter is false.
102 * 
103 * </p>
104 * <p>
105 * On the <i>failed</i> output port, the actor emits a BooleanToken indicating
106 * whether the connection opening failed. It emits 'true' only if the
107 * <i>postpone</i> flag is false and the connection failed. Otherwise it emits a
108 * 'false' token. This can be used to throw an exception or stop the workflow
109 * that cannot work without a connection, or to successively try out other
110 * hosts.
111 * 
112 * </p>
113 * <p>
114 * Port forwarding is supported. The 'portforwarding' parameter should have the
115 * format "-L port:host:hostport -R port:host:hostport ...". Many local and/or
116 * remote forwarding specification can be given.
117 * </p>
118 * 
119 * Reference: Ant version 1.6.2.
120 * 
121 * @author Norbert Podhorszki
122 * @version $Revision: 32079 $
123 * @category.name remote
124 * @category.name connection
125 * @category.name external execution
126 */
127
128public class SshSession extends TypedAtomicActor {
129
130    /**
131     * Construct an SshSession actor with the given container and name. Create
132     * the parameters, initialize their values.
133     * 
134     * @param container
135     *            The container.
136     * @param name
137     *            The name of this actor.
138     * @exception IllegalActionException
139     *                If the entity cannot be contained by the proposed
140     *                container.
141     * @exception NameDuplicationException
142     *                If the container already has an actor with this name.
143     */
144    public SshSession(CompositeEntity container, String name)
145            throws NameDuplicationException, IllegalActionException {
146        super(container, name);
147
148        // target selects the machine where to connect to
149        target = new PortParameter(this, "target", new StringToken(
150                "[user@]host[:port]"));
151        new Parameter(target.getPort(), "_showName", BooleanToken.TRUE);
152        target.setStringMode(true);
153
154        paramIdentity = new FileParameter(this, "identity");
155        identity = new TypedIOPort(this, "identity", true, false);
156        identity.setTypeEquals(BaseType.STRING);
157
158        paramGridCert = new FileParameter(this, "gridcert");
159        gridcert = new TypedIOPort(this, "gridcert", true, false);
160        gridcert.setTypeEquals(BaseType.STRING);
161
162        paramGridProxy = new FileParameter(this, "gridproxy");
163        gridproxy = new TypedIOPort(this, "gridproxy", true, false);
164        gridproxy.setTypeEquals(BaseType.STRING);
165
166        // port forwarding specifications
167        portforwarding = new PortParameter(this, "portforwarding",
168                new ArrayToken(BaseType.STRING));
169        new Parameter(portforwarding.getPort(), "_showName", BooleanToken.FALSE);
170
171        postpone = new Parameter(this, "postpone", new BooleanToken(false));
172        postpone.setTypeEquals(BaseType.BOOLEAN);
173
174        closeAtEnd = new Parameter(this, "closeAtEnd", new BooleanToken(false));
175        closeAtEnd.setTypeEquals(BaseType.BOOLEAN);
176
177        target_out = new TypedIOPort(this, "target_out", false, true);
178        target_out.setTypeEquals(BaseType.STRING);
179
180        failed = new TypedIOPort(this, "failed", false, true);
181        failed.setTypeEquals(BaseType.BOOLEAN);
182
183        // Set the type constraints.
184
185        _attachText("_iconDescription", "<svg>\n" + "<rect x=\"0\" y=\"0\" "
186                + "width=\"75\" height=\"50\" style=\"fill:gray\"/>\n"
187                + "<text x=\"5\" y=\"30\""
188                + "style=\"font-size:25; fill:yellow; font-family:SansSerif\">"
189                + "SSH2</text>\n" + "</svg>\n");
190    }
191
192    // //////////////// Public ports and parameters ///////////////////////
193
194    /**
195     * Target in user@host:port format. If user is not provided, the local
196     * username will be used. If port is not provided, the default port 22 will
197     * be applied. If target is "local" or empty string, nothing happens in this
198     * actor and all commands (in other ssh actors) will be executed locally,
199     * using Java Runtime.
200     */
201    public PortParameter target;
202
203    /**
204     * The file path for the ssh identity file if the user wants to connect
205     * without having to enter the password all the time.
206     * 
207     * The user can browse this file as it is a parameter.
208     */
209    public FileParameter paramIdentity;
210
211    /**
212     * The string representation of the file path for the ssh identity file if
213     * the user wants to connect without having to enter the password all the
214     * time.
215     * 
216     * This is the input option for the identity file.
217     */
218    public TypedIOPort identity;
219
220    /**
221     * The file path for the Grid Certificate file if the user wants to connect
222     * to an GSI-SSH server without having to enter the password all the time
223     * (the passphrase for the certificate will be asked once).
224     * 
225     * The user can browse this file as it is a parameter.
226     */
227    public FileParameter paramGridCert;
228
229    /**
230     * The string representation of the file path for the Grid Certificate file if
231     * the user wants to connect to an GSI-SSH server without having to enter the 
232     * password all the time (the passphrase for the certificate will be asked once).
233     * 
234     * This is the input option for the Grid Certificate file.
235     */
236    public TypedIOPort gridcert;
237
238    /**
239     * The file path for the Grid Certificate Proxy file if the user wants to connect
240     * to an GSI-SSH server without having to enter the password all the time.
241     * 
242     * The user can browse this file as it is a parameter.
243     */
244    public FileParameter paramGridProxy;
245
246    /**
247     * The string representation of the file path for the Grid Certificate Proxy file if
248     * the user wants to connect to an GSI-SSH server without having to enter the 
249     * password all the time.
250     * 
251     * This is the input option for the Grid Certificate Proxy file.
252     */
253    public TypedIOPort gridproxy;
254
255    /**
256     * Port forwarding specification. Format:
257     * "-L port:host:hostport -R port:host:hostport ..." Many forwarding spec
258     * can be given.
259     */
260    public PortParameter portforwarding;
261
262    /**
263     * String output: same as input target. It can be used for identifying the
264     * created session. ... so you do not need to wire actors together even, if
265     * you have the target as a parameter.
266     */
267    public TypedIOPort target_out;
268
269    /**
270     * Boolean output to indicate whether the connection opening failed. It is
271     * true if and only if the <i>postpone</i> parameter is false and the
272     * connection to the target fails.
273     */
274    public TypedIOPort failed;
275
276    /**
277     * Specifying whether actual connection to the host should be postponed
278     * until the first usage somewhere in the workflow. If false, connection is
279     * made here, which is good for password based authentications: at least you
280     * know when to expect the password dialog popping up.
281     */
282    public Parameter postpone;
283
284    /**
285     * Specifying whether actual connection to the host should be closed when
286     * the workflow terminates. If you run more than one workflow at once using
287     * the same remote host, this flag should be false to avoid closing the
288     * session in one workflow while the others are still using it.
289     */
290    public Parameter closeAtEnd;
291
292    // /////////////////////////////////////////////////////////////////
293    // // public methods ////
294
295    /**
296     * initialize() runs once before first exec
297     * 
298     * @exception IllegalActionException
299     *                If the parent class throws it.
300     */
301    public void initialize() throws IllegalActionException {
302        super.initialize();
303        execObjectSet = new Hashtable<String,ExecInterface>();
304    }
305
306    /**
307     * fire. Create a session and open the connection if not postponed.
308     * 
309     * @exception IllegalActionException
310     *                is thrown if the session cannot be opened.
311     */
312    public void fire() throws IllegalActionException {
313        super.fire();
314
315        // process inputs
316        target.update();
317        StringToken tg = (StringToken) target.getToken();
318        String strTarget = tg.stringValue();
319                //back compatibility, remove the double quotes at the very beginning and at the very last.
320        strTarget  = strTarget.replaceAll("^\"|\"$", "");
321
322        portforwarding.update();
323        Token t = portforwarding.getToken();
324        ArrayToken pft = null;
325        if (t instanceof ArrayToken)
326            pft = (ArrayToken) t;
327        else if (t.getType() == BaseType.STRING) {
328            StringToken[] sta = { (StringToken) t };
329            pft = new ArrayToken(sta);
330        } else {
331            pft = new ArrayToken(BaseType.STRING); // just an empty array
332        }
333
334        boolean bPostpone = ((BooleanToken) postpone.getToken()).booleanValue();
335        boolean bCloseAtEnd = ((BooleanToken) closeAtEnd.getToken())
336                .booleanValue();
337
338        if (isDebugging)
339            log.debug("Create session to " + strTarget + ". postpone = "
340                    + bPostpone + " closeAtEnd = " + bCloseAtEnd);
341
342        // process grid certificate files if given
343        String strCert = null;
344        int n = gridcert.numberOfSources();
345        if (n > 0) {
346            // take from input port (last will be effective)
347            for (int i = 0; i < n; i++) {
348                if (gridcert.hasToken(i)) {
349                    strCert = ((StringToken) gridcert.get(0)).stringValue();
350                    strCert = trimFileName(strCert);
351                }
352            }
353        } else {
354            // take from parameter
355            strCert = ((StringToken) paramGridCert.getToken()).stringValue();
356            strCert = trimFileName(strCert);
357        }
358        if (strCert != null && strCert.length() > 0) {
359            log.debug("Use Grid Certificate: " + strCert);
360            System.setProperty("X509_USER_CERT", strCert);
361        }
362
363
364        // process grid proxy files if given
365        String strProxy = null;
366        n = gridproxy.numberOfSources();
367        if (n > 0) {
368            // take from input port (last will be effective)
369            for (int i = 0; i < n; i++) {
370                if (gridproxy.hasToken(i)) {
371                    strProxy = ((StringToken) gridproxy.get(0)).stringValue();
372                    strProxy = trimFileName(strProxy);
373                }
374            }
375        } else {
376            // take from parameter
377            strProxy = ((StringToken) paramGridProxy.getToken()).stringValue();
378            strProxy = trimFileName(strProxy);
379        }
380        if (strProxy != null && strProxy.length() > 0) {
381            log.debug("Use Grid Proxy: " + strProxy);
382            System.setProperty("X509_USER_PROXY", strProxy);
383        }
384
385
386        // Get ExecInterface object 
387        boolean connectionFailed = false;
388
389        try {
390            ExecInterface execObj = ExecFactory.getExecObject(strTarget);
391
392            // process ssh identity files if given
393            String strIdentity;
394            n = identity.numberOfSources();
395            if (n > 0) {
396                for (int i = 0; i < n; i++) {
397                    if (identity.hasToken(i)) {
398                        strIdentity = ((StringToken) identity.get(0)).stringValue();
399                        strIdentity = trimFileName(strIdentity);
400                        if (strIdentity != null && strIdentity.length() > 0)
401                            execObj.addIdentity(strIdentity);
402                    }
403                }
404            } else {
405                strIdentity = ((StringToken) paramIdentity.getToken()).stringValue();
406                strIdentity = trimFileName(strIdentity);
407
408                if (strIdentity != null && strIdentity.length() > 0)
409                    execObj.addIdentity(strIdentity);
410            }
411
412            // process port forwarding specs
413            for (int i = 0; i < pft.length(); i++) {
414                StringToken st = (StringToken) pft.getElement(i);
415                String fwd = st.stringValue().trim();
416                try {
417                    if (fwd.length() > 0) {
418                        if (fwd.startsWith("-R")) {
419                            String foo = fwd.substring(2).trim();
420                            execObj.setPortForwardingR(foo);
421                            if (isDebugging)
422                                log.debug("Remote port forwarding: " + foo);
423                        } else if (fwd.startsWith("-L")) {
424                            String foo = fwd.substring(2).trim();
425                            execObj.setPortForwardingL(foo);
426                            if (isDebugging)
427                                log.debug("Local port forwarding: " + foo);
428                        } else {
429                            log.error("Invalid forwarding request. Start with -L or -R : " + fwd);
430                        }
431                    } else {
432                        // if (isDebugging)
433                        // log.debug("fwd spec <empty>. skip.");
434                    }
435
436                } catch (ExecException e) {
437                    log.error("Port forwarding request failed: " + e);
438                    throw new ExecException("Port forwarding request failed: " + e);
439                }
440            }
441
442            // if postpone is not requested, open the connection now
443            // (and ask for password if needed)
444            if (!bPostpone) {
445                if (isDebugging)
446                    log.debug("Open connection right now to " + strTarget);
447                execObj.openConnection();
448            }
449            
450            /* add ssh object to hash table for closing at end */
451            if (bCloseAtEnd)
452                execObjectSet.put(strTarget, execObj);
453    
454        } catch (ExecException e) {
455                String errorMsg = "Establishing connection to " + strTarget + " failed: " + e;
456            log.error(errorMsg);
457            connectionFailed = true;
458            throw new IllegalActionException(this, errorMsg);
459        }
460
461        target_out.send(0, tg);
462        failed.send(0, new BooleanToken(connectionFailed));
463
464    } // end-of-method fire()
465
466    /**
467     * Close all sessions. This method is invoked exactly once per execution of
468     * an application. None of the other action methods should be be invoked
469     * after it.
470     * 
471     * @exception IllegalActionException
472     *                Not thrown in this base class.
473     */
474    public void wrapup() throws IllegalActionException {
475        if (isDebugging)
476            log.debug("wrapup begin");
477        super.wrapup();
478        closeAll();
479        if (isDebugging)
480            log.debug("wrapup end");
481    }
482
483    /**
484     * Close all sessions. This method is invoked exactly once per execution of
485     * an application. None of the other action methods should be be invoked
486     * after it.
487     * 
488     * @exception IllegalActionException
489     *                Not thrown in this base class.
490     */
491    public void stop() {
492        if (isDebugging)
493            log.debug("stop begin");
494        super.stop();
495        closeAll();
496        if (isDebugging)
497            log.debug("stop end");
498    }
499
500    // /////////////////////////////////////////////////////////////////
501    // // private methods ////
502
503    /**
504     * Close all sessions.
505     */
506    private void closeAll() {
507        if (execObjectSet == null)
508            return;
509        Iterator keys = execObjectSet.keySet().iterator();
510        while (keys.hasNext()) {
511            String target = (String) keys.next();
512            ExecInterface execObj = (ExecInterface) execObjectSet.get(target);
513            execObj.closeConnection();
514            if (isDebugging)
515                log.debug("Closed session to " + target);
516        }
517        execObjectSet = null;
518    }
519
520    /**
521     * Hack the "path in FileParameter" to a path string that can be handled.
522     */
523    private String trimFileName(String path) {
524        if (path != null && path.length() > 0) {
525            // Hack the path because we can't deal with "file:" or "file://"
526            if (path.startsWith("file:")) {
527                path = path.substring(5);
528
529                if (path.startsWith("//")) {
530                    path = path.substring(2);
531                }
532            }
533        }
534        return path;
535    }
536
537    /**
538     * Hash table for the execution objects, used only for closing their
539     * sessions at the end of the workflow. key: user@host:port as String value:
540     * the ExecInterface object
541     * 
542     */
543    private Hashtable<String,ExecInterface> execObjectSet;
544
545    private static final Log log = LogFactory
546            .getLog(SshSession.class.getName());
547    private static final boolean isDebugging = log.isDebugEnabled();
548
549}
550
551// vim: sw=4 ts=4 et