001/* An engine than runs models in Spark.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.director;
031
032import java.io.File;
033import java.io.FileWriter;
034import java.io.IOException;
035import java.io.InputStream;
036import java.io.StringWriter;
037import java.net.InetAddress;
038import java.net.InetSocketAddress;
039import java.net.Socket;
040import java.net.URI;
041import java.net.UnknownHostException;
042import java.util.Collections;
043import java.util.HashSet;
044import java.util.List;
045import java.util.Set;
046
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
049import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
050import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
051import org.apache.spark.api.java.JavaSparkContext;
052import org.apache.spark.api.java.function.Function2;
053import org.apache.spark.api.java.function.PairFlatMapFunction;
054import org.kepler.build.project.ProjectLocator;
055import org.kepler.configuration.ConfigurationProperty;
056import org.kepler.ddp.Utilities;
057import org.kepler.ddp.actor.pattern.AtomicPatternActor;
058import org.kepler.ddp.actor.pattern.CoGroup;
059import org.kepler.ddp.actor.pattern.Cross;
060import org.kepler.ddp.actor.pattern.DDPDataSink;
061import org.kepler.ddp.actor.pattern.DDPDataSource;
062import org.kepler.ddp.actor.pattern.DDPPatternActor;
063import org.kepler.ddp.actor.pattern.Map;
064import org.kepler.ddp.actor.pattern.Match;
065import org.kepler.ddp.actor.pattern.Reduce;
066import org.kepler.ddp.actor.pattern.SingleInputPatternActor;
067import org.kepler.ddp.director.DDPDirector;
068import org.kepler.ddp.director.DDPEngine;
069import org.kepler.spark.actor.SparkConnection;
070import org.kepler.spark.operator.CoGroupOperator;
071import org.kepler.spark.operator.CrossOperator;
072import org.kepler.spark.operator.DataSink;
073import org.kepler.spark.operator.DataSource;
074import org.kepler.spark.operator.FileDataSink;
075import org.kepler.spark.operator.FileDataSource;
076import org.kepler.spark.operator.JoinOperator;
077import org.kepler.spark.operator.MapOperator;
078import org.kepler.spark.operator.NullSink;
079import org.kepler.spark.operator.Operator;
080import org.kepler.spark.operator.ReduceOperator;
081import org.kepler.spark.operator.TokenSink;
082import org.kepler.spark.operator.TokenSource;
083import org.kepler.spark.stub.KeplerCoGroupStub;
084import org.kepler.spark.stub.KeplerCrossStub;
085import org.kepler.spark.stub.KeplerMapStub;
086import org.kepler.spark.stub.KeplerMatchStub;
087import org.kepler.spark.stub.KeplerReduceStub;
088import org.kepler.spark.stub.ScriptEngineMapStub;
089import org.kepler.spark.stub.ScriptEngineReduceStub;
090
091import ptolemy.actor.IOPort;
092import ptolemy.data.ArrayToken;
093import ptolemy.data.IntToken;
094import ptolemy.data.StringToken;
095import ptolemy.data.Token;
096import ptolemy.data.expr.Parameter;
097import ptolemy.data.expr.StringParameter;
098import ptolemy.data.type.BaseType;
099import ptolemy.kernel.util.Attribute;
100import ptolemy.kernel.util.IllegalActionException;
101import ptolemy.kernel.util.NameDuplicationException;
102import ptolemy.kernel.util.NamedObj;
103import ptolemy.kernel.util.Workspace;
104
105/** An engine than runs workflows in Spark. This engine
106 *  converts DDP pattern actors (Map, Reduce, Cross, CoGroup, and
107 *  Match) and I/O actors (DDPDataSink and DDPDataSource) into a
108 *  Spark job and runs it on the server.
109 *  <p>
110 *  <b>NOTE:</b> Only DDP pattern and I/O actors may be present in
111 *  the workflow. Other actors must placed inside the composite
112 *  pattern actors or in a different sub-workflow.
113 *  </p>
114 * 
115 *  @author Daniel Crawl
116 *  @version $Id: SparkEngine.java 34532 2016-10-17 19:14:15Z crawl $
117 *
118 */
119public class SparkEngine extends DDPEngine {
120
121    public SparkEngine(DDPDirector director) throws IllegalActionException,
122            NameDuplicationException {
123        super(director);
124
125        _engineName = SPARK_ENGINE_NAME;
126        
127        // initialize this here since _addParameters() accesses it
128        // before the field can be initialized.
129        _numSameJVMWorkers = 8;
130    }
131
132    /** React to a parameter change. */
133    @Override
134        public void attributeChanged(Attribute attribute) throws IllegalActionException {
135                
136        if(attribute == masterHostAndPort) {
137            InetSocketAddress address = null;
138            String masterHostAndPortStr = ((StringToken)masterHostAndPort.getToken()).stringValue();
139            if(!masterHostAndPortStr.trim().isEmpty()) {
140                // parse the string to check for errors.
141                address = SparkConnection.getMasterAndPortAddress(masterHostAndPortStr);
142            }
143                        
144            if(address != null && address.getHostString().equals("localhost")) {
145                try {
146                    _masterHostAndPortStr = InetAddress.getLocalHost().getHostName() +
147                            ":" + address.getPort();
148                } catch (UnknownHostException e) {
149                    throw new IllegalActionException(_director, e, "Unknown host.");
150                }
151                System.out.println("Using " + _masterHostAndPortStr +
152                        " instead of 'localhost' since Spark has problems binding to 'localhost'."); 
153            } else {
154                _masterHostAndPortStr = masterHostAndPortStr;
155            }
156            
157        } else if(attribute == numSameJVMWorkers) {
158            Token token = numSameJVMWorkers.getToken();
159            if(token == null) {
160                throw new IllegalActionException(_director,
161                        "numSameJVMWorkers must be specified.");
162            }
163            _numSameJVMWorkers = ((IntToken)token).intValue();
164                            
165        } else if(attribute == driverMemory) {
166            String driverMemoryStr = ((StringToken)driverMemory.getToken()).stringValue();
167            if(driverMemoryStr.trim().isEmpty()) {
168                driverMemoryStr = SparkConnection.DEFAULT_DRIVER_MEMORY;
169            }
170                            
171        } else {
172                super.attributeChanged(attribute);
173        }
174        
175    }
176
177    /** Clone the SparkEngine into the specified workspace.
178     *  @param workspace The workspace for the cloned object.
179     *  @exception CloneNotSupportedException Not thrown in this base class
180     *  @return The new SparkEngine.
181     */
182    @Override
183    public Object clone(Workspace workspace) throws CloneNotSupportedException {
184        SparkEngine newObject = (SparkEngine) super.clone(workspace);
185        newObject._context = null;
186        newObject._inputsAlreadyDone = new HashSet<Operator>();
187        newObject._job = null;
188        return newObject;
189    }
190    
191    @Override
192    public void preinitialize() throws IllegalActionException {
193        
194        super.preinitialize();
195
196        // get the degree of parallelism even though it may be done
197        // in the parent class. the parent class may set it to 1
198        // as the default.
199        _degreeOfParallelism = ((IntToken)_director.degreeOfParallelism.getToken()).intValue();
200
201        // load the default config dir if necessary
202        _checkConfigDir();
203        
204    }
205
206    public static void shutdownServer() throws IllegalActionException {
207        
208        // shut down any servers we started.
209        synchronized(_serverStartStopLock) {
210                for(String stopScriptStr : _stopScriptsForShutdown) {
211                    
212                    System.out.println("Stopping Spark server by running " +
213                            stopScriptStr);
214                    
215                    ProcessBuilder builder = new ProcessBuilder(stopScriptStr);
216                    
217                    // make sure JAVA_HOME is set
218                    java.util.Map<String,String> env = builder.environment();
219                    if(env.get("JAVA_HOME") == null) {
220                        env.put("JAVA_HOME", System.getProperty("java.home"));
221                    }
222                    
223                    try {
224                        Process process = builder.start();
225                        process.waitFor();
226                    } catch (Exception e) {
227                        throw new IllegalActionException("Unable to stop Spark: " + e.getMessage());
228                    }            
229                }
230        }
231    }
232
233    /** Stop any running Spark jobs. */
234    @Override
235    public void stop() throws IllegalActionException {
236        super.stop();
237        
238        synchronized(_jobLock) {
239            if(_job != null) {
240                _job.stop();
241            }
242        }
243    }
244
245    ///////////////////////////////////////////////////////////////////
246    ////                      public fields                      //////
247
248    public StringParameter masterHostAndPort;
249    
250    public Parameter numSameJVMWorkers;
251    
252    public StringParameter driverMemory;
253    
254    ///////////////////////////////////////////////////////////////////
255    ////                      protected methods                  //////
256
257
258    /** Check the existence of the Spark assembly jar. */
259    @Override
260    protected boolean _checkFilesBeforeStartingServer() throws IllegalActionException {
261        
262        if(!Utilities.isExecutingUnderResourceManager()) {
263            // look for the assembly jar. the assembly jar is not shipped with
264            // the spark module, so give the URL building it if not found.
265            if(!super._checkFilesBeforeStartingServer()) {
266                throw new IllegalActionException(
267                    "Cannot start the Spark Master since the assembly jar does not exist. " +
268                    "This jar is not shipped\n" +
269                    "with Kepler and must be built; " +
270                    "the instructions for building it can be found at:\n" +
271                    "https://kepler-project.org/developers/interest-groups/distributed/configuring-hadoop-for-biokepler-or-ddp-suite");
272            }
273        }
274        return true;
275    }
276
277    @Override
278    protected void _executeDDPJob() throws IllegalActionException {
279        
280        // see what jars are necessary. call this before creating the job
281        // since a new classloader be created for external classes.
282        final List<URI> jarURIs = _getJarList();            
283
284        _getContext();
285        
286        try {
287            _sparkParallelism = _context.defaultParallelism();
288            
289            if(_degreeOfParallelism == DDPDirector.DEFAULT_INTTOKEN.intValue()) {
290                _degreeOfParallelism = _sparkParallelism;
291            }
292
293            if(_degreeOfParallelism > _sparkParallelism) {
294                System.err.println("WARNING: the default degree of parallelism for " +
295                        _director.getName() + " is " + _degreeOfParallelism +
296                        ", which is greater than Spark's parallelism." +
297                        "(" + _sparkParallelism + ").");
298                if(_sameJVM) {
299                    System.err.println("Increase the number of local workers to achieve " +
300                        "this degree of parallelism.");
301                }
302            }
303            
304            System.out.println("Default Spark parallelism is " +
305                    _degreeOfParallelism + ".");
306            
307            for (URI jarURI : jarURIs) {
308                String jarStr = jarURI.toString();
309                // Spark 2 throws an exception when more than one jar with
310                // the same name is added (even when in different directories).
311                // So we need to exclude the (empty) kepler-tasks.jar that is
312                // built for the kepler-tasks module.
313                if(!jarStr.endsWith("target/kepler-tasks.jar")) {
314                    _context.addJar(jarStr);
315                }
316            }
317
318            synchronized(_jobLock) {
319                _job = _getModelPlan();
320            
321                //System.out.println("Starting Spark job.");
322            
323                _job.start();
324            }
325            
326            try {
327                _job.waitForFinish();
328            } catch (Exception e) {
329                throw new IllegalActionException(_director, e, "Error in Spark job.");
330            }
331                
332            //System.out.println("Finished Spark job.");
333            
334            // set the _job to null here instead of wrapup() to prevent
335            // stop() from calling _job.stop() when the context has been
336            // stopped.
337            synchronized(_jobLock) {
338                _job = null;
339            }        
340
341        } finally {
342            if(_context != null) {
343                SparkConnection.releaseContext();
344                _context = null;
345            }
346        }
347    }
348
349        /** Add parameters to the containing director. */
350    @Override
351    protected void _addParameters() throws IllegalActionException, NameDuplicationException {
352
353        masterHostAndPort = (StringParameter) _director.getAttribute("masterHostAndPort");
354        if(masterHostAndPort == null) {
355                masterHostAndPort = new StringParameter(_director, "masterHostAndPort");
356        }
357        
358        numSameJVMWorkers = (Parameter) _director.getAttribute("numSameJVMWorkers");
359        if(numSameJVMWorkers == null) {
360            numSameJVMWorkers = new Parameter(_director, "numSameJVMWorkers");
361            numSameJVMWorkers.setTypeEquals(BaseType.INT);                          
362            numSameJVMWorkers.setToken(String.valueOf(_numSameJVMWorkers));
363        }
364        
365        driverMemory = (StringParameter) _director.getAttribute("driverMemory");
366        if(driverMemory == null) {
367            driverMemory = new StringParameter(_director, "driverMemory");
368            driverMemory.setToken(SparkConnection.DEFAULT_DRIVER_MEMORY);
369        }       
370    }
371
372    /** Remove engine-specific parameters from the director. */
373    @Override
374    protected void _removeParameters() throws IllegalActionException, NameDuplicationException {      
375        masterHostAndPort.setContainer(null);
376        numSameJVMWorkers.setContainer(null);
377        driverMemory.setContainer(null);
378    }
379
380    /** Constructs a SparkJob from the model. */
381    protected SparkJob _getModelPlan() throws IllegalActionException {
382     
383        _inputsAlreadyDone.clear();
384
385        SparkJob plan = null;
386        
387        // find all the sinks in the model
388        final List<DDPDataSink> sinks = _container.entityList(DDPDataSink.class);
389        
390        if(sinks.isEmpty()) {
391            throw new IllegalActionException(_director, "No data sinks found.");
392        }
393        
394        // for each sink, traverse the graph to the source, adding to the plan
395        for(DDPDataSink sink : sinks) {
396            
397            // get the operator for this sink
398            DataSink operator = (DataSink) _getContract(sink);
399            
400            // add sink to the plan
401            if(plan == null) {
402                plan = new SparkJob(_context, operator, "Kepler Spark Job " + _container.getName());
403            } else {
404                plan.addDataSink(operator);
405            }
406            
407            // traverse graph for this sink
408            _addInputsForContract(sink, operator);
409        }
410                
411        return plan;
412    }
413
414    /** Recursively add inputs for a contract by traversing the model graph.
415     * 
416     *  @param pactor the current pactor in the model.
417     *  @param contract the contract for the current pactor. 
418     */
419    private void _addInputsForContract(DDPPatternActor pactor, Operator operator) throws IllegalActionException {
420        
421        // see if we've already done this contract
422        if(!_inputsAlreadyDone.contains(operator)) {
423        
424            // see if the execution class name or execution code is set
425            final String executionClassName = pactor.getExecutionClassName();
426            final String executionCodeType = pactor.getExecutionCodeType();
427            if(executionClassName.isEmpty() &&
428                    executionCodeType == null &&
429                    (pactor instanceof SingleInputPatternActor)) {
430                // add sub-wf to plan configuration
431                _addSubModelToContract((SingleInputPatternActor)pactor, operator);
432            }
433            
434            // see how many inputs are required
435            int numRequiredInputs = operator.numInputs();
436            if(numRequiredInputs < 0 || numRequiredInputs > 2) {
437                throw new IllegalActionException(_director, "Unknown type of contract: " + operator.getClass());
438            }
439            
440            // see if there's at least one input
441            if(numRequiredInputs > 0) {
442                
443                // get the first input port
444                final IOPort inputPort1 = (IOPort) pactor.getPort("in");
445                if(inputPort1 == null) {
446                    throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() +
447                        "is missing input port \"in\".");
448                }
449                
450                _addInputForPactorPort(inputPort1, pactor, operator);
451
452                // see if there's a second input
453                if(numRequiredInputs > 1) {
454                    
455                    final IOPort inputPort2 = (IOPort) pactor.getPort("in2");
456                    if(inputPort2 == null) {
457                        throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() +
458                            "is missing input port \"in2\".");
459                    }
460    
461                    _addInputForPactorPort(inputPort2, pactor, operator);
462                }
463            }
464        }
465    }
466
467    /** Set the input contract for a single input port and traverse the model
468     *  graph to recursively add input contracts.
469     *  
470     *  @param port the input port for 
471     *  @param pactor the current pactor in the model.
472     *  @param contract the contract for the current pactor. 
473     */
474    private void _addInputForPactorPort(IOPort port, DDPPatternActor pactor, Operator operator) 
475        throws IllegalActionException {
476        
477        // get the connected actor
478        final List<?> outputPorts = port.sourcePortList();
479        if(outputPorts.isEmpty()) {
480            throw new IllegalActionException(_director, "DDPPatternActor input port " +
481                    port.getName() + " must be connected.");
482        } else if(outputPorts.size() > 1) {
483            throw new IllegalActionException(_director, "DDPPatternActor input port " +
484                    port.getName() + " may only be connected to one port.");                
485        }
486        
487        final IOPort outputPort1 = (IOPort) outputPorts.get(0);
488        final NamedObj outputNamedObj = outputPort1.getContainer();
489                
490        // FIXME
491        if(!(outputNamedObj instanceof DDPPatternActor)) {
492            throw new IllegalActionException(_director, "Actor " + pactor.getFullName() +
493                " is connected to a non-DDPPatternActor: " + outputNamedObj.getFullName());
494        }
495        
496        final Operator outputContract = _getContract((DDPPatternActor)outputNamedObj);
497
498        int numInputs = operator.numInputs();
499        if(numInputs == 1) {
500            operator.setInput(outputContract);
501        } else if(numInputs == 2) {
502            if(port.getName().equals("in")) {
503                operator.setInput(0, outputContract);
504            } else if(port.getName().equals("in2")) {
505                operator.setInput(1, outputContract);                
506            } else {
507                throw new IllegalActionException(port, "Input port must be named either \"in\" or \"in2\".");
508            }
509        }
510        
511        _inputsAlreadyDone.add(operator);
512        
513        // recursively add the inputs for output pactor
514        _addInputsForContract((DDPPatternActor)outputNamedObj, outputContract);
515    }
516
517    /** Write the sub-workflow of a SingleInputPatternActor either to a parameter
518     *  in the Spark job configuration or to a file.
519     */
520    protected void _addSubModelToContract(SingleInputPatternActor pactor, Operator contract)
521            throws IllegalActionException {
522                         
523        final String name = pactor.getName();
524                
525        final Configuration contractConfiguration = contract.getParameters();
526        contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, name);
527
528        contractConfiguration.setBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT,
529                pactor.getRunWorkflowLifecyclePerInput());
530        
531        // make a clone of the pactor so we can add TypeAttributes to ports
532        // and add parameters from the hierarchy
533        
534        Workspace workspaceForClone;
535        if(_sameJVM) {
536            workspaceForClone = new Workspace(name);
537        } else {
538            workspaceForClone = _director.workspace();
539        }
540        
541        SingleInputPatternActor clonePactor;
542        try {
543            clonePactor = (SingleInputPatternActor)pactor.clone(workspaceForClone);
544        } catch (CloneNotSupportedException e) {
545            throw new IllegalActionException(_director, e, "Unable to clone " + name);
546        }
547        
548        // copy the port types to the clone
549        _setPortTypes(clonePactor);
550        
551        // copy the parameters to the clone
552        _copyParameters(pactor, clonePactor);
553        
554        // set display redirect path
555        String directDir = pactor.getDisplayRedirectDir();
556        NamedObj redirectSpecifier = pactor;
557        if(directDir.isEmpty()) {
558            redirectSpecifier = _director;
559            directDir = _displayRedirectDir;
560        }
561        if(!directDir.isEmpty()) {
562            
563            // display redirection when running in same jvm is not supported
564            if(_sameJVM) {
565                throw new IllegalActionException(redirectSpecifier,
566                        "Redirecting display actors is not supported for"
567                        + " Spark server running in the same JVM as Kepler.");
568            }
569            
570            final File file = new File(directDir);
571            if (!file.exists() && !file.mkdirs()) {
572                throw new IllegalActionException(_director, "Could not create directories " + file);
573            }
574            if (!file.isDirectory() || !file.canWrite()) {
575                throw new IllegalActionException(redirectSpecifier,
576                        "Parameter '" + _displayRedirectDir + "' must be a directory and writable.");
577            }
578            contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, directDir);
579        }
580
581        if(_sameJVM) {
582            _subWorkflows.put(name, clonePactor);
583        } else {
584        
585            // remove top-level ports and relations
586            Utilities.removeModelPorts(clonePactor);
587            
588            // serialize the clone pactor
589            
590            if(_writeSubWorkflowsToFiles) {
591                
592                if(_jobDir == null) {
593                    _createJobDirectory();
594                }
595                
596                final String modelPath = _jobDir + name + ".xml";
597                FileWriter writer = null;
598                try {
599                    writer = new FileWriter(modelPath);
600                    clonePactor.exportMoML(writer);
601                } catch(IOException e) {
602                    throw new IllegalActionException(_director, e, "Error writing model to " + modelPath);
603                } finally {
604                    if(writer != null) {
605                        try {
606                            writer.close();
607                        } catch (IOException e) {
608                            throw new IllegalActionException(_director, e, "Error writing model to " + modelPath);
609                        }
610                    }
611                }
612                
613                // add model file to plan configuration
614                contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, modelPath);
615                
616            } else {
617             
618                // write model to a string
619                final StringWriter writer = new StringWriter();
620                try {
621                    clonePactor.exportMoML(writer);
622                } catch (IOException e) {
623                    throw new IllegalActionException(_director, e, "Error serializing model.");
624                }
625                
626                // add string to configuration
627                contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODEL, writer.toString());
628            }            
629            
630            // set the location of the kepler installation directory.
631            // NOTE: this is done so that the stub can initialize kepler and set
632            // the java properties for each module's workflow directory, e.g.:
633            // property("spark.workflowdir")
634            // if the modules directory does not exist on the stub, e.g., the file
635            // system is not shared, then initialization is not done and the stub 
636            // workflow cannot use the module workflow directory properties.
637            contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODULES_DIR,
638                    ProjectLocator.getKeplerModulesDir().getAbsolutePath());
639        
640        }
641    }    
642    
643    /** Get the Spark Context. */
644    private void _getContext() throws IllegalActionException {
645
646        String typeStr = ((StringToken)_director.startServerType.getToken()).stringValue().trim();
647        
648        if(typeStr.isEmpty() || typeStr.equals("default")) {
649            
650            if(!_masterHostAndPortStr.trim().isEmpty()) {
651                throw new IllegalActionException(_director,
652                    "masterHostAndPort must not be specified if startServerType is default (or empty).");
653            }
654            
655            _context = SparkConnection.getContext(
656                "",
657                _numSameJVMWorkers,
658                _driverMemoryStr,
659                _container.getFullName());        
660        
661            // see if we were able to get a context
662            if(_context == null) {
663                _sameJVM = false;
664            } else {
665                // see if the context is to a local master.
666                String sparkMasterStr = _context.getConf().get("spark.master");
667                _sameJVM = sparkMasterStr.startsWith("local");
668            }
669            
670        } else if(typeStr.equals(DDPDirector.SAME_JVM_STRING)) {
671            
672            _context = SparkConnection.getContext("local",
673                _numSameJVMWorkers,
674                _driverMemoryStr,
675                _container.getFullName());
676            
677            _sameJVM = true;
678            
679            if(_context == null) {
680                throw new IllegalActionException(_director,
681                        "Unable to create Spark context in same JVM.");
682            }
683            
684        } else if(typeStr.equals(DDPDirector.DISTRIBUTED_STRING)) {
685                       
686            _sameJVM = false;
687
688            InetSocketAddress socketAddress;
689            if(_masterHostAndPortStr.trim().isEmpty()) {
690                socketAddress = SparkConnection.getDefaultMasterSocketAddress();
691            } else {
692                socketAddress = SparkConnection.getMasterAndPortAddress(_masterHostAndPortStr);
693            }
694            
695            // see if we can connect to standalone server
696            boolean connected = false;
697            try(Socket socket = new Socket();) {
698                socket.connect(socketAddress, _CONNECT_TIMEOUT);
699                connected = true;
700            } catch(IOException e) {
701                //System.err.println("IOException connecting to " + socketAddress +
702                    //": " + e.getMessage());
703                connected = false;
704            }
705
706            // start standalone server since could not connect
707            if(!connected) {
708
709                File configDirFile = new File(_configDirStr);
710                String parentDirStr = configDirFile.getParent();
711                String startScriptStr;
712                String stopScriptStr;
713                
714                ConfigurationProperty engineProperty = 
715                    Utilities.getEngineProperty(SPARK_ENGINE_NAME, _director);
716                
717                ConfigurationProperty scriptsProperty = engineProperty.getProperty("Server.Scripts");
718                if(scriptsProperty == null) {
719                    throw new IllegalActionException(_director, "Server Scripts not found in configuration file.");
720                }
721                
722                final String pathStr = parentDirStr + File.separator + "sbin" +
723                        File.separator;
724
725                String startName;
726                String stopName;
727               
728                // see if we are running under a resource manager
729                if(Utilities.isExecutingUnderResourceManager()) {
730                    System.out.println("Running under cluster resource manager.");
731                    startName = "ClusterStart";
732                    stopName = "ClusterStop";
733                } else {
734                    startName = "Start";
735                    stopName = "Stop";                      
736                }
737
738                ConfigurationProperty property = scriptsProperty.getProperty(startName);
739                if(property == null) {
740                    throw new IllegalActionException(_director, "No start script property in configuration file: " + startName);
741                }
742                startScriptStr = pathStr + property.getValue();
743                
744
745                property = scriptsProperty.getProperty(stopName);
746                if(property == null) {
747                    throw new IllegalActionException(_director, "No stop script property in configuration file: " + startName);
748                }
749                stopScriptStr = pathStr + property.getValue();
750                
751                System.out.println("Spark server start script is " + startScriptStr);
752                System.out.println("Spark server stop script is " + stopScriptStr);
753
754                if(_checkServer(socketAddress, startScriptStr)) {
755                    _stopScriptsForShutdown.add(stopScriptStr);
756                }
757            }
758            
759            _context = SparkConnection.getContext(            
760                    _masterHostAndPortStr,
761                    _numSameJVMWorkers,
762                    _driverMemoryStr,
763                    _container.getFullName());            
764            
765            
766        } else {
767            throw new IllegalActionException(_director,
768                "Unsupported value for startServerType: " + typeStr);
769        }        
770            
771        if(_context == null) {
772            throw new IllegalActionException(_director,
773                "Unable to create Spark context.");
774        }
775
776    }
777    
778    /** Parse the output from the script that starts the server.
779     *  @return If the start script specifies a server URL, returns the
780     *  socket address for that URL. Otherwise, returns null.  
781     */
782    @Override
783    protected InetSocketAddress _parseOutputFromStartingServer(InputStream input) throws IOException, IllegalActionException {
784        InetSocketAddress retval = null;
785        if(Utilities.isExecutingUnderResourceManager()) {
786            String hostAndPortStr = SparkConnection.parseOutputFromStartingServer(input);
787            if(hostAndPortStr != null) {
788                _masterHostAndPortStr = hostAndPortStr;
789                retval = SparkConnection.getMasterAndPortAddress(_masterHostAndPortStr);
790            }
791        }
792        return retval;
793    }
794
795    /** Get the contract for a DDPPatternActor. */
796    private Operator _getContract(DDPPatternActor actor) throws IllegalActionException {
797
798        Operator operator = null;
799        
800        final String actorName = actor.getName();
801        
802        final String stubClassName = actor.getExecutionClassName();
803        Class<?> stubClass = null;
804        
805        // see if the stub was set
806        if(!stubClassName.isEmpty()) {
807            try {
808                stubClass = _classLoader.loadClass(stubClassName);
809            } catch (ClassNotFoundException e) {
810                throw new IllegalActionException(actor, e,
811                        "Could not find execution class " + stubClassName); 
812            }
813        }
814        
815        final String stubCodeTypeName = actor.getExecutionCodeType();
816        if(stubCodeTypeName != null) {
817            
818            // TODO
819            if(!(actor instanceof Map) && !(actor instanceof Reduce)) {
820                throw new IllegalActionException(actor, "Code execution not yet supported for this pattern.");
821            }
822        }
823                
824        final boolean stubIsSubWorkflow = (stubClass == null) && (stubCodeTypeName == null);
825        
826        if(actor instanceof DDPDataSource) {
827            operator = _getSourceSinkContract((DDPDataSource)actor, true);            
828        } else if(actor instanceof Map) {
829            
830            PairFlatMapFunction stub;
831            
832            if(stubIsSubWorkflow) {
833                // execute sub-workflow as stub
834                stub = new KeplerMapStub();
835            } else if(stubCodeTypeName != null) {
836                stub = new ScriptEngineMapStub();
837            } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) {
838                throw new IllegalActionException(actor, "Execution class " +
839                        stubClassName + " must be a subclass of " +
840                        PairFlatMapFunction.class.getName());
841            } else {
842                System.out.println("Using Map execution class " + stubClassName +
843                        " for " + actor.getFullName());
844
845                try {
846                    stub = (PairFlatMapFunction) stubClass.newInstance();
847                } catch(Throwable t) {
848                    throw new IllegalActionException(actor, t,
849                            "Error instantiating map class " + stubClass.getName());
850                }
851
852            }
853            
854            
855            operator = new MapOperator(stub, actorName);
856            
857        } else if (actor instanceof Reduce) {
858            
859            Function2<?,?,?> stub = null;
860            PairFlatMapFunction stubNewKeys = null;
861            
862            if(stubIsSubWorkflow) {
863                // execute sub-workflow as stub
864                
865                // see if reducer is also a combiner
866                // TODO
867                //if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) {
868                //    stubClass = KeplerCombineAndReduceStub.class;
869                //} else {          
870                    stubNewKeys = new KeplerReduceStub();
871                //}
872                
873            } else if(stubCodeTypeName != null) {
874                stubNewKeys = new ScriptEngineReduceStub();
875            } else if(Function2.class.isAssignableFrom(stubClass)) {
876
877                System.out.println("Using Reduce execution class " + stubClassName +
878                        " for " + actor.getFullName());
879                
880                try {
881                    stub = (Function2<?, ?, ?>) stubClass.newInstance();
882                } catch(Throwable t) {
883                    throw new IllegalActionException(actor, t,
884                            "Error instantiating reduce class " + stubClass.getName());
885                }
886
887            } else if(PairFlatMapFunction.class.isAssignableFrom(stubClass)) {
888                
889                System.out.println("Using Reduce execution class " + stubClassName +
890                        " for " + actor.getFullName());
891
892                try {
893                    stubNewKeys = (PairFlatMapFunction) stubClass.newInstance();
894                } catch(Throwable t) {
895                    throw new IllegalActionException(actor, t,
896                            "Error instantiating map class " + stubClass.getName());
897                }
898                
899            } else {            
900                throw new IllegalActionException(actor, "Execution class " +
901                        stubClassName + " must be a subclass of " +
902                        Function2.class.getName() + " or " +
903                        PairFlatMapFunction.class.getName());
904              }
905        
906            if(stub != null) {
907                operator = new ReduceOperator(stub, actorName);
908            } else {
909                operator = new ReduceOperator(stubNewKeys, actorName);
910            }
911
912        } else if(actor instanceof Cross) {
913            
914            PairFlatMapFunction stub;
915            
916            // see if execution class was set
917            if(stubClass == null) {
918                // execute sub-workflow as stub
919                stub = new KeplerCrossStub();
920            } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) {
921                throw new IllegalActionException(actor, "Execution class " +
922                        stubClassName + " must be a subclass of " +
923                        PairFlatMapFunction.class.getName());
924            } else {
925                System.out.println("Using Cross execution class " + stubClassName +
926                        " for " + actor.getFullName());
927
928                try {
929                    stub = (PairFlatMapFunction) stubClass.newInstance();
930                } catch(Throwable t) {
931                    throw new IllegalActionException(actor, t,
932                            "Error instantiating cross class " + stubClass.getName());
933                }
934            }
935            
936            operator = new CrossOperator(stub, actorName);
937
938        } else if(actor instanceof CoGroup) {
939            
940            PairFlatMapFunction stub;
941            
942            // see if execution class was set
943            if(stubClass == null) {
944                // execute sub-workflow as stub
945                stub = new KeplerCoGroupStub();
946            } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) {
947                throw new IllegalActionException(actor, "Execution class " +
948                        stubClassName + " must be a subclass of " +
949                        PairFlatMapFunction.class.getName());
950            } else {
951                System.out.println("Using CoGroup execution class " + stubClassName +
952                        " for " + actor.getFullName());
953
954                try {
955                    stub = (PairFlatMapFunction) stubClass.newInstance();
956                } catch(Throwable t) {
957                    throw new IllegalActionException(actor, t,
958                            "Error instantiating cogroup class " + stubClass.getName());
959                }
960            }
961
962            operator = new CoGroupOperator(stub, actorName);
963
964        } else if(actor instanceof Match) {
965            
966            PairFlatMapFunction stub;
967            
968            // see if execution class was set
969            if(stubClass == null) {
970                // execute sub-workflow as stub
971                stub = new KeplerMatchStub();
972            } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) {
973                throw new IllegalActionException(actor, "Execution class " +
974                        stubClassName + " must be a subclass of " +
975                        PairFlatMapFunction.class.getName());
976            } else {
977                System.out.println("Using Match execution class " + stubClassName +
978                        " for " + actor.getFullName());
979
980                try {
981                    stub = (PairFlatMapFunction) stubClass.newInstance();
982                } catch(Throwable t) {
983                    throw new IllegalActionException(actor, t,
984                            "Error instantiating match class " + stubClass.getName());
985                }
986            }
987
988            operator = new JoinOperator(stub, actorName);
989            
990        } else if(actor instanceof DDPDataSink) {
991            operator = _getSourceSinkContract((DDPDataSink)actor, false);                        
992        } else {
993            throw new IllegalActionException(_director, "Cannot determine Operator type for " +
994                    actor.getFullName());
995        }
996        
997        // set the number of parallel instances.
998        int numInstances = actor.getDegreeOfParallelism();
999        
1000        // if less than 1, use the director's value
1001        if(numInstances <= 0) {
1002            numInstances = _degreeOfParallelism;
1003        }
1004        // check if greater than num local workers
1005        else if(numInstances > _sparkParallelism) {
1006            System.err.println("WARNING: degree of parallelism for " +
1007                    actor.getName() + " is " + numInstances + ", which is " +
1008                    "greater than Spark's parallelism " +
1009                    "(" + _sparkParallelism + ").");
1010            if(_sameJVM) {
1011                System.err.println("Increase the number of local workers to achieve " +
1012                    "this degree of parallelism.");
1013            }
1014        }
1015        
1016        operator.setDegreeOfParallelism(numInstances);
1017        
1018        // set any job arguments if an execution class is used (not a sub-workflow)
1019        final Configuration configuration = operator.getParameters();
1020        if(!stubIsSubWorkflow) {
1021            for(java.util.Map.Entry<String,String> entry : _jobArgumentsMap.entrySet()) {
1022                // XXX this assumes they are all strings.
1023                configuration.set(entry.getKey(), entry.getValue());
1024            }
1025        }
1026
1027        // add any actor parameters to the contract configuration
1028        final java.util.Map<String,String> contractParameters = actor.getParameters();
1029        final java.util.Map<String,String> paraNames = actor.getParaImplNames(_engineName);
1030        for(java.util.Map.Entry<String, String> entry : contractParameters.entrySet()) {
1031            String keplerParaName = entry.getKey();
1032            if (paraNames.get(keplerParaName) != null) {
1033                configuration.set(paraNames.get(keplerParaName), entry.getValue());
1034            } else { 
1035                configuration.set(Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::" + keplerParaName, entry.getValue());
1036            }
1037        }
1038        //set print execution info parameter
1039        configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, actor.getPrintExeInfo());
1040
1041        // set the same JVM flag.
1042        if(_sameJVM) {
1043            configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, true);        
1044        }
1045        
1046        if(stubCodeTypeName != null) {
1047                        
1048            String scriptEngineFactoryName = Utilities.getScriptEngineFactoryName(stubCodeTypeName);
1049            if(scriptEngineFactoryName == null) {
1050                throw new IllegalActionException(actor,
1051                        "No script engine has been configured for " + stubCodeTypeName);
1052            }
1053            
1054            if(!_sameJVM) {
1055                List<String> jars = Utilities.getJarsForLanguage(stubCodeTypeName);
1056                _additionalJars.addAll(jars);
1057            }
1058            
1059            configuration.set(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, scriptEngineFactoryName);
1060            
1061            configuration.set(Utilities.CONFIGURATION_KEPLER_STUB_CODE, actor.getExecutionCode());     
1062            
1063        }
1064        
1065        return operator;
1066    }
1067    
1068    /** Get a contract for a source or sink.
1069     *  @param actor the Pactor
1070     *  @param parameterName the name of the parameter containing the class name.
1071     *  @param input if true, get a source contract. if false, get a sink contract.
1072     */
1073    private Operator _getSourceSinkContract(AtomicPatternActor actor, boolean input) throws IllegalActionException {
1074        
1075        final String actorName = actor.getName();
1076        
1077        String className = actor.getFormatClassName(_engineName);
1078        if(className == null) {
1079            throw new IllegalActionException(_director, "Could not find format class name for " +
1080                    " actor " + actorName);
1081        }
1082        
1083        // try to get the class
1084        Class<?> clazz;
1085        try {
1086            clazz = _classLoader.loadClass(className);
1087        } catch (ClassNotFoundException e) {
1088            throw new IllegalActionException(actor, "Format type " + className +
1089                " was not found in the format types configurations or is a class not on the classpath.");
1090        }
1091        
1092        Operator operator = null;
1093        
1094        if(input) {
1095                        
1096            if(FileInputFormat.class.isAssignableFrom(clazz)) {
1097                operator = new FileDataSource((Class<? extends FileInputFormat<?,?>>) clazz,
1098                    ((DDPDataSource) actor).getPathAsURI(), actorName);
1099            } else if(TokenSource.class.isAssignableFrom(clazz)) {
1100
1101                ArrayToken token = DDPDataSource.getToken(actor.getFullName());
1102                
1103                if(token == null) {
1104                    throw new IllegalActionException(actor, 
1105                        "No input token found for source actor " + actorName);
1106                }
1107
1108                operator = new TokenSource(token, actor.getName());
1109                
1110            } else {
1111                throw new IllegalActionException(actor, "Unsupported type of format class: " +
1112                        clazz.getName());
1113            }
1114            
1115            ((DataSource) operator).setContext(_context);
1116
1117            
1118        } else {
1119            
1120            if(FileOutputFormat.class.isAssignableFrom(clazz)) {
1121                
1122                operator = new FileDataSink((Class<? extends FileOutputFormat<?,?>>) clazz,
1123                    ((DDPDataSink)actor).getPathAsURI(), actorName);
1124            
1125            } else if(TokenSink.class.isAssignableFrom(clazz)) {
1126                
1127                operator = new TokenSink(actor.getFullName(), actorName);
1128                
1129            } else if(clazz == NullOutputFormat.class) {
1130                operator = new NullSink(actorName);
1131            } else {
1132                throw new IllegalActionException(actor, "Unsupported type of format class: " +
1133                        clazz.getName());
1134            }
1135        }
1136        
1137        return operator;
1138
1139    }
1140
1141    ///////////////////////////////////////////////////////////////////
1142    ////                      private fields                     //////
1143
1144    /** The name of the spark engine. */
1145    public final static String SPARK_ENGINE_NAME = "Spark";
1146    
1147    /**  */
1148    private JavaSparkContext _context;
1149
1150    private Set<Operator> _inputsAlreadyDone = new HashSet<Operator>();
1151    
1152    /** A Spark job that can be cancelled. */
1153    private SparkJob _job;
1154
1155        /**
1156         * A list of scripts to execute to stop each spark server that is
1157         * started.
1158         */
1159        private final static Set<String> _stopScriptsForShutdown = Collections
1160                        .synchronizedSet(new HashSet<String>());
1161        
1162        /** The number of workers for the local context. */
1163        private int _numSameJVMWorkers = SparkConnection.DEFAULT_NUM_LOCAL_WORKERS;
1164        
1165        private String _driverMemoryStr = SparkConnection.DEFAULT_DRIVER_MEMORY;
1166        
1167        private String _masterHostAndPortStr = "";
1168        
1169        private int _sparkParallelism;
1170        
1171}