001/* 
002 * Copyright (c) 2015 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-07 18:53:35 +0000 (Wed, 07 Feb 2018) $' 
007 * '$Revision: 34661 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.spark.actor;
030
031import java.io.BufferedReader;
032import java.io.File;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.InputStreamReader;
036import java.net.InetAddress;
037import java.net.InetSocketAddress;
038import java.net.Socket;
039import java.net.UnknownHostException;
040
041import org.apache.spark.SparkConf;
042import org.apache.spark.api.java.JavaSparkContext;
043import org.kepler.build.modules.Module;
044import org.kepler.build.modules.ModuleTree;
045import org.kepler.configuration.ConfigurationProperty;
046import org.kepler.ddp.Utilities;
047import org.kepler.spark.director.SparkEngine;
048
049import ptolemy.data.IntToken;
050import ptolemy.data.StringToken;
051import ptolemy.data.expr.Parameter;
052import ptolemy.data.expr.StringParameter;
053import ptolemy.data.type.BaseType;
054import ptolemy.kernel.util.Attribute;
055import ptolemy.kernel.util.IllegalActionException;
056import ptolemy.kernel.util.NameDuplicationException;
057import ptolemy.kernel.util.NamedObj;
058import ptolemy.util.MessageHandler;
059import ptolemy.vergil.icon.BoxedValueIcon;
060
061/** An attribute that contains connection information to a Spark server.
062 *  The value specified in connectionName is used by Spark actors to
063 *  refer to an instance of this attribute and use its Spark server.
064 * 
065 * @author Daniel Crawl
066 * @version $Id: SparkConnection.java 34661 2018-02-07 18:53:35Z crawl $
067 */
068public class SparkConnection extends Attribute {
069
070    /** Create a new SparkConnection with the specified name. */
071    public SparkConnection(NamedObj container, String name)
072            throws IllegalActionException, NameDuplicationException {
073        super(container, name);
074        
075        connectionName = new StringParameter(this, "connectionName");
076        connectionName.setToken("default");
077        
078        serverName = new StringParameter(this, "serverName");
079        serverName.setToken("");
080        
081        numLocalWorkers = new Parameter(this, "numLocalWorkers");
082        numLocalWorkers.setTypeEquals(BaseType.INT);
083        numLocalWorkers.setToken(new IntToken(DEFAULT_NUM_LOCAL_WORKERS));
084        
085        driverMemory = new StringParameter(this, "driverMemory");
086        driverMemory.setToken(DEFAULT_DRIVER_MEMORY);
087        
088        // set the icon to a BoxedValueIcon to display the profile name
089        BoxedValueIcon icon = new BoxedValueIcon(this, "_icon");
090        icon.displayWidth.setExpression("25");
091        icon.attributeName.setExpression("connectionName");
092        icon.boxColor.setExpression("{0.0, 1.0, 1.0, 1.0}");
093    }
094
095    @Override
096    public void attributeChanged(Attribute attribute) throws IllegalActionException {
097     
098        if(attribute == numLocalWorkers) {
099            int val = ((IntToken)numLocalWorkers.getToken()).intValue();
100            synchronized(_contextLock) {
101                if(val != _numLocalWorkers) {
102                    _numLocalWorkers = val;
103                }
104            }
105        } else if(attribute == driverMemory) {
106            String driverMemoryStr = ((StringToken)driverMemory.getToken()).stringValue();
107            synchronized(_contextLock) {
108                if(!driverMemoryStr.equals(_driverMemory)) {
109                    _driverMemory = driverMemoryStr;
110                }
111            }
112        } else if(attribute == serverName) {
113            String serverNameStr = ((StringToken)serverName.getToken()).stringValue();
114            synchronized(_contextLock) {
115                if(!serverNameStr.equals(_serverName)) {
116                    _serverName = serverNameStr;
117                }
118            }
119        } else {
120            super.attributeChanged(attribute);
121        }
122    }
123    
124    /** Clone this object into the specified workspace. */
125    /*
126    @Override
127    public Object clone(Workspace workspace) throws CloneNotSupportedException {
128        SparkConnection newObject = (SparkConnection) super.clone(workspace);
129        return newObject;
130    }
131    */
132    
133    /** Get the name used for this attribute. */
134    public String getConnectionName() throws IllegalActionException {
135        return ((StringToken)connectionName.getToken()).stringValue();
136    }
137
138    /** Get the context using the parameters of this SparkConnection
139     *  (serverName, driverMemory, numLocalWorkers, etc.).
140     */
141    public JavaSparkContext getContext() throws IllegalActionException {
142        synchronized(_contextLock) {
143            return getContext(_serverName, _numLocalWorkers, _driverMemory,
144                getConnectionName());
145        }
146    }
147    
148    /** Get the context for a specific master.
149     *  @param masterNameAndPortStr The master host and port. If empty,
150     *  then environment variables MASTER, SPARK_MASTER_IP, and SPARK_LOCAL_IP
151     *  are checked for the master. If these are empty, then try to connect
152     *  to a master running on localhost. If the connection fails, then a
153     *  local master is used (runs in same JVM). 
154     *  @param numLocalWorkersVal The number of workers to use for a
155     *  local master.
156     *  @param driverMemoryStr The amount of memory in the driver.
157     *  @param connectionNameStr The connecting name string.
158     */
159    public static JavaSparkContext getContext(String masterNameAndPortStr,
160        int numLocalWorkersVal, String driverMemoryStr,
161        String connectionNameStr) throws IllegalActionException {
162        
163        synchronized(_contextLock) {
164            
165            // see if we already created the context and any of the
166            // parameters are different.
167            if(_context != null && (
168                    _serverName == null ||
169                    !_serverName.equals(masterNameAndPortStr) ||
170                    _numLocalWorkers != numLocalWorkersVal ||
171                    _driverMemory == null ||
172                    !_driverMemory.equals(driverMemoryStr))) {
173                System.out.println("Will recreate Spark context since parameter(s) have changed.");
174                //System.out.println("CONTEXT COUNTER = " + _contextCounter);
175                _context.stop();
176                _context = null;
177                if(_startedMySparkServer) {
178                    _stopServerWithMySpark();
179                }
180            }
181            
182            if(_context == null) {
183                
184                String masterStr;
185                // if the server was not specified, use the default.
186                if(masterNameAndPortStr == null || 
187                    masterNameAndPortStr.trim().isEmpty()) {
188                    
189                    // get the default address for the master
190                    InetSocketAddress socketAddress = getDefaultMasterSocketAddress();
191                    
192                    // see if we can connect
193                    boolean connected = false;
194                    try(Socket socket = new Socket();) {
195                        socket.connect(socketAddress, _CONNECT_TIMEOUT);
196                        connected = true;
197                    } catch(IOException e) {
198                        //System.err.println("IOException connecting to " + socketAddress +
199                            //": " + e.getMessage());
200                        connected = false;
201                    }
202
203                    // if we couldn't connect, use local
204                    if(connected) {
205                        masterStr = "spark://" +
206                                socketAddress.getHostName() + ":" +
207                                socketAddress.getPort();
208                    }
209                    // if we are running on a cluster, start the standalone
210                    // server using the myspark scripts.
211                    else if(Utilities.isExecutingUnderResourceManager()) {
212                        masterStr = _startServerWithMySpark();
213                    } else {
214                        masterStr = "local[" +
215                            (numLocalWorkersVal < 1 ? "*" : numLocalWorkersVal) +
216                            "]";
217                    }
218                } else if(masterNameAndPortStr.startsWith("local")) {
219                    masterStr = "local[" +
220                        (numLocalWorkersVal < 1 ? "*" : numLocalWorkersVal) +
221                        "]";
222                } else if(!masterNameAndPortStr.startsWith("spark://")) {
223                    masterStr = "spark://" + masterNameAndPortStr;
224                } else {
225                    masterStr = masterNameAndPortStr;
226                }
227                
228                if(driverMemoryStr.trim().isEmpty()) {
229                    driverMemoryStr = DEFAULT_DRIVER_MEMORY;
230                    System.err.println("WARNING: Spark driver memory not specified. " +
231                        "Using default value of " +
232                        DEFAULT_DRIVER_MEMORY);
233                }
234                                
235                _context = _createContext(masterStr, driverMemoryStr,
236                    connectionNameStr);
237                
238                _serverName = masterStr;
239                _numLocalWorkers = numLocalWorkersVal;
240                _driverMemory = driverMemoryStr;
241            }
242
243            if(_context != null) {
244                _contextCounter++;
245                //System.out.println("CONTEXT COUNTER incremented = " + _contextCounter);
246                
247                if(!_context.getConf().get("spark.master").startsWith("local")) {
248                    // add the module jars, e.g., actors.jar, ptolemy.jar, etc.
249                    final ModuleTree moduleTree = ModuleTree.instance();
250                    for(Module module : moduleTree) {
251                        
252                        // Spark 2 throws an exception when more than one jar with
253                        // the same name is added (even when in different directories).
254                        // So we need to exclude the (empty) kepler-tasks.jar that is
255                        // built for the kepler-tasks module.
256                        if(!module.getName().equals("kepler-tasks")) {
257                            final File moduleJar = module.getTargetJar();
258                            
259                            // add the module jar if it exists.
260                            // some modules, e.g., outreach, do not have jars.
261                            if(moduleJar.exists()) {
262                                _context.addJar(moduleJar.getAbsolutePath());                
263                            }
264                        }
265                    }
266                }
267            }
268        }
269        return _context;
270    }
271
272    /** Get the default context. */
273    public static JavaSparkContext getDefaultContext() throws IllegalActionException {
274        return getContext(_serverName, _numLocalWorkers,
275            _driverMemory, "default");    
276    }
277
278    /** Get the default Spark master socket address. */
279    public static InetSocketAddress getDefaultMasterSocketAddress() throws IllegalActionException {
280        
281        String masterHostAndPortStr = null;
282        InetSocketAddress socketAddress;
283        
284        // first try environment variables: MASTER, SPARK_MASTER_IP, SPARK_LOCAL_IP
285        String envMaster = System.getenv("MASTER");
286        if(envMaster != null && envMaster.startsWith("spark://")) {
287            masterHostAndPortStr = envMaster;
288            System.out.println("Using $MASTER: " + masterHostAndPortStr);
289            socketAddress = getMasterAndPortAddress(masterHostAndPortStr);
290        } else {
291            String hostname = null;
292            String envSparkMasterIP = System.getenv("SPARK_MASTER_IP");
293            if(envSparkMasterIP != null) {
294                hostname = envSparkMasterIP;
295                System.out.println("Using $SPARK_MASTER_IP: " + envSparkMasterIP);
296            } else {
297                String envSparkLocalIP = System.getenv("SPARK_LOCAL_IP");
298                if(envSparkLocalIP != null) {
299                    hostname = envSparkLocalIP;
300                    System.out.println("Using $SPARK_LOCAL_IP: " + envSparkLocalIP);
301                } else {
302                    // finally, use localhost
303                    try {
304                        hostname = InetAddress.getLocalHost().getHostName();
305                    } catch (UnknownHostException e) {
306                        throw new IllegalActionException("Error determining host name:" +
307                                e.getMessage());
308                    }
309                }
310            }
311
312            if(hostname == null || hostname.isEmpty()) {
313                throw new IllegalActionException("Unable to determine host name.");
314            }
315                
316            //System.out.println("Using default port for master: " + DEFAULT_MASTER_PORT);
317            socketAddress = new InetSocketAddress(hostname, DEFAULT_MASTER_PORT);
318        }     
319        
320        return socketAddress;
321    }
322    
323    /** Parse a string containing the master host and port and return the
324     *  socket address.
325     */
326    public static InetSocketAddress getMasterAndPortAddress(String masterHostAndPortStr) throws IllegalActionException {
327        boolean isSparkURL = masterHostAndPortStr.startsWith("spark://");
328        String hostname = null;
329        int port;
330        String[] parts = masterHostAndPortStr.split(":");
331        if(parts.length != 2 &&
332                (!isSparkURL || parts.length != 3)) {
333            throw new IllegalActionException("masterHostAndPort must have both " +
334                    " master host name and port.");
335        }
336        if(isSparkURL) {
337            hostname = parts[1].substring("//".length());
338            try {
339                port = Integer.parseInt(parts[2]);
340            } catch(NumberFormatException e) {
341                throw new IllegalActionException("Unable to parse port: " + parts[1] +
342                        ": " + e.getMessage());
343            }
344        } else {
345            hostname = parts[0];
346            try {
347                port = Integer.valueOf(parts[1]).intValue();
348            } catch(NumberFormatException e) {
349                throw new IllegalActionException("Unable to parse port: " + parts[1] +
350                        ": " + e.getMessage());
351            }
352        }
353        //System.out.println("h = " + hostname + " p = " + port);
354        return new InetSocketAddress(hostname, port);
355    }
356
357    /** Parse the output from starting the server using start-myspark.sh 
358     *  @return The master URL if successful, otherwise null. 
359     */
360    public static String parseOutputFromStartingServer(InputStream input) throws IOException, IllegalActionException {
361        String masterHostAndPortStr = null;
362        try(BufferedReader reader = new BufferedReader(new InputStreamReader(input));) {
363            String line = null;
364            while ((line = reader.readLine()) != null) {
365                if(line.startsWith("MASTER=spark://")) {
366                    String[] parts = line.split("=");
367                    masterHostAndPortStr = parts[1];
368                    System.out.println("Spark master host and port is " + masterHostAndPortStr);
369                }
370            }
371        }
372        return masterHostAndPortStr;
373    }
374    
375    public static void releaseContext() {
376        synchronized(_contextLock) {
377            if(_context != null) {
378                _contextCounter--;
379                //System.out.println("CONTEXT COUNTER decremented = " + _contextCounter);
380                if(_contextCounter == 0) {
381                    // if we are using a standalone server, close the connection
382                    // since otherwise we can't submit new jobs.
383                    //if(!_context.getConf().get("spark.master").startsWith("local")) {
384                        _context.stop();
385                    //}
386                    _context = null;
387                }
388            }
389        }
390    }
391
392    /** Stop any contexts that were started. */
393    public static void shutdown() {        
394        synchronized(_contextLock) {
395            if(_context != null) {
396                _context.stop();
397                _context = null;
398            }
399            
400            if(_startedMySparkServer) {
401                try {
402                    _stopServerWithMySpark();
403                } catch (IllegalActionException e) {
404                    MessageHandler.error("Error stopping Spark server.", e);
405                }
406            }
407        }
408    }
409        
410    /** The name of the connection. */
411    public StringParameter connectionName;
412    
413    /** The host name of the Spark server. Use "local" to run on the local machine. */
414    public StringParameter serverName;
415    
416    /** The number of workers when running Spark locally. */
417    public Parameter numLocalWorkers;
418
419    /** The amount of memory to use in the Spark driver. */
420    public StringParameter driverMemory;
421    
422    ///////////////////////////////////////////////////////////////////
423    ////                         private methods                   ////
424
425    /** Create a context. */
426    private static JavaSparkContext _createContext(final String masterAndPortStr,
427            String driverMemoryStr, final String connectionNameStr)
428                throws IllegalActionException {
429        
430        final JavaSparkContext[] contexts = new JavaSparkContext[1];
431        
432        // try to get the spark context.
433        // use a ExecutorService with a timeout to handle
434        // connecting to a master run by someone else.
435        /*
436        ExecutorService service = Executors.newSingleThreadExecutor();
437        try {
438            service.submit(new Runnable() {
439                @Override
440                public void run() {
441        */          
442                    //System.out.println("Getting JavaSparkContext for " +
443                            //connectionNameStr + " at " + masterAndPortStr);                
444
445                    SparkConf conf = new SparkConf()
446                            .setMaster(masterAndPortStr)
447                            .setAppName("Kepler Spark " + connectionNameStr)
448                            .set("spark.driver.memory", DEFAULT_DRIVER_MEMORY);
449                          
450                    contexts[0] = new JavaSparkContext(conf);
451                    
452                    //contexts[0].sc().addSparkListener(new StdoutSparkListener());
453
454                    // if we're not using the local server, we need to wait
455                    // for the scheduler to set the default parallelism to
456                    // be the total number of cores. see:
457                    // https://issues.apache.org/jira/browse/SPARK-809
458                    //
459                    
460                    if(!masterAndPortStr.startsWith("local")) {                        
461                        // wait 5 seconds for the executor added events
462                        System.out.println("Waiting 5 seconds to receive executor(s) information.");
463                        try {
464                            Thread.sleep(5000);
465                        } catch (InterruptedException e) {
466                            throw new IllegalActionException("Interrupted while waiting for " +
467                                "Spark scheduler to initialize: " + e.getMessage());
468                        }
469                    }
470       /*
471                    System.out.println("finished calling jsp ctor");
472                }
473            }).get(_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);                    
474        } catch (InterruptedException | ExecutionException | TimeoutException e) {
475            System.err.println("WARNING: Timeout connecting to master at " +
476                    masterAndPortStr + ".");
477        } finally {
478            service.shutdownNow();
479        }
480        */
481        
482        if(contexts[0] != null) {
483            System.out.println("Created Spark context for: " + 
484                contexts[0].getConf().get("spark.master"));
485        }
486        
487        return contexts[0];
488    }
489    
490    /** Start the standalone server using start-myspark.sh.
491     *  @return Returns the master and port string.
492     */
493    private static String _startServerWithMySpark() throws IllegalActionException {
494        
495        // set the default location of the config directory
496        String workflowDirStr = System.getProperty("spark.workflowdir");
497        if(workflowDirStr == null) {
498            throw new IllegalActionException("System property " +
499                    "spark.workflowdir not set.");
500        }
501        
502        String configDirStr = workflowDirStr + File.separator +
503            "tools" + File.separator + "conf";
504        
505        File configDirFile = new File(configDirStr);
506        String parentDirStr = configDirFile.getParent();
507        
508        final String pathStr = parentDirStr + File.separator + "sbin" +
509                    File.separator;
510        
511        ConfigurationProperty engineProperty =
512            Utilities.getEngineProperty(SparkEngine.SPARK_ENGINE_NAME, null);
513
514        ConfigurationProperty startProperty = engineProperty.getProperty("Server.Scripts.ClusterStart");
515        if(startProperty == null) {
516            throw new IllegalActionException("Could not find ClusterStart script in configuration file.");
517        }
518        
519        String startScriptStr = pathStr + startProperty.getValue();
520        
521        System.out.println("Starting Spark server with " + startScriptStr);
522
523        // see if the script is executable. kepler modules are zipped,
524        // which does not preserve the permissions.
525        File startScriptFile = new File(startScriptStr);
526        if(!startScriptFile.canExecute()) {
527            throw new IllegalActionException( 
528                    "The script " + startScriptFile + " is not executable.\n" +
529                            "You must change the permissions so that " +
530                            startScriptFile.getName() + 
531                            " and all the other scripts in \n" +
532                            startScriptFile.getParent() + " are executable.");
533        }
534        
535        ProcessBuilder builder = new ProcessBuilder(startScriptStr);        
536        builder.redirectErrorStream(true);
537
538        String masterAndPortStr;
539        
540        try {
541            Process process = builder.start();
542            masterAndPortStr = parseOutputFromStartingServer(process.getInputStream());
543            process.waitFor();
544            _startedMySparkServer = true;
545        } catch(Exception e) {
546            throw new IllegalActionException("Error starting Spark server " +
547                    "using myspark: " + e.getMessage());
548        }
549        
550        
551        return masterAndPortStr;
552    }
553    
554    /** Stop the standalone server using stop-myspark.sh. */
555    private static void _stopServerWithMySpark() throws IllegalActionException {
556        
557        // set the default location of the config directory
558        String workflowDirStr = System.getProperty("spark.workflowdir");
559        if(workflowDirStr == null) {
560            throw new IllegalActionException("System property " +
561                    "spark.workflowdir not set.");
562        }
563        
564        String configDirStr = workflowDirStr + File.separator +
565            "tools" + File.separator + "conf";
566        
567        File configDirFile = new File(configDirStr);
568        String parentDirStr = configDirFile.getParent();
569        
570        final String pathStr = parentDirStr + File.separator + "sbin" +
571                File.separator;
572    
573        ConfigurationProperty engineProperty =
574            Utilities.getEngineProperty(SparkEngine.SPARK_ENGINE_NAME, null);
575    
576        ConfigurationProperty startProperty = engineProperty.getProperty("Server.Scripts.ClusterStop");
577        if(startProperty == null) {
578            throw new IllegalActionException("Could not find ClusterStop script in configuration file.");
579        }
580        
581        String stopScriptStr = pathStr + startProperty.getValue();
582        
583        System.out.println("Stopping Spark server with " + stopScriptStr);
584
585        // see if the script is executable. kepler modules are zipped,
586        // which does not preserve the permissions.
587        File startScriptFile = new File(stopScriptStr);
588        if(!startScriptFile.canExecute()) {
589            throw new IllegalActionException( 
590                    "The script " + startScriptFile + " is not executable.\n" +
591                            "You must change the permissions so that " +
592                            startScriptFile.getName() + 
593                            " and all the other scripts in \n" +
594                            startScriptFile.getParent() + " are executable.");
595        }
596        
597        ProcessBuilder builder = new ProcessBuilder(stopScriptStr);        
598        builder.redirectErrorStream(true);
599
600        try {
601            Process process = builder.start();
602            process.waitFor();
603            _startedMySparkServer = false;
604        } catch(Exception e) {
605            throw new IllegalActionException("Error stopping Spark server " +
606                    "using myspark: " + e.getMessage());
607        }
608        
609    }
610
611    ///////////////////////////////////////////////////////////////////
612    ////                         private fields                    ////
613
614    /** The spark context.
615     *  NOTE: Spark only allows a single SparkContext per JVM,
616     *  so this is static.
617     */
618    private static JavaSparkContext _context;
619    
620    /** Lock object for _context. */
621    private static final Object _contextLock = new Object();
622    
623    /** The default number of local workers. */
624    public static final int DEFAULT_NUM_LOCAL_WORKERS = 0;
625    
626    /** The number of workers for the local context. */
627    private static int _numLocalWorkers = DEFAULT_NUM_LOCAL_WORKERS;
628    
629    /** The default amount of memory for the Spark driver. */
630    public static final String DEFAULT_DRIVER_MEMORY = "1g";
631    
632    /** The amount of memory for the Spark driver. */
633    private static String _driverMemory = DEFAULT_DRIVER_MEMORY;
634    
635    /** Default port for the master. */
636    public final static int DEFAULT_MASTER_PORT = 7077;
637
638    /** Timeout (in ms) when seeing if server is running. */
639    private final static int _CONNECT_TIMEOUT = 10*1000;
640    
641    /** Default server name. */
642    private static String _serverName = "";
643    
644    /** Reference counter for the context. */
645    private static int _contextCounter = 0;
646
647    /** If true, we've started standalone server using myspark. */
648    private static boolean _startedMySparkServer = false;
649    
650}