001/* An engine than runs models in Stratosphere.
002 * 
003 * Copyright (c) 2011-2013 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.director;
031
032import java.io.File;
033import java.io.FileWriter;
034import java.io.IOException;
035import java.io.StringWriter;
036import java.net.InetSocketAddress;
037import java.net.URI;
038import java.util.Collections;
039import java.util.HashSet;
040import java.util.LinkedList;
041import java.util.List;
042import java.util.Set;
043
044import org.kepler.build.project.ProjectLocator;
045import org.kepler.ddp.Utilities;
046import org.kepler.ddp.actor.pattern.AtomicPatternActor;
047import org.kepler.ddp.actor.pattern.CoGroup;
048import org.kepler.ddp.actor.pattern.Cross;
049import org.kepler.ddp.actor.pattern.DDPDataSink;
050import org.kepler.ddp.actor.pattern.DDPDataSource;
051import org.kepler.ddp.actor.pattern.DDPPatternActor;
052import org.kepler.ddp.actor.pattern.Map;
053import org.kepler.ddp.actor.pattern.Match;
054import org.kepler.ddp.actor.pattern.Reduce;
055import org.kepler.ddp.actor.pattern.SingleInputPatternActor;
056import org.kepler.ddp.actor.pattern.Types;
057import org.kepler.ddp.director.DDPDirector;
058import org.kepler.ddp.director.DDPEngine;
059import org.kepler.stratosphere.io.input.TokenDataSource;
060import org.kepler.stratosphere.io.input.TokenInputFormat;
061import org.kepler.stratosphere.io.output.NullOutputFormat;
062import org.kepler.stratosphere.io.output.TokenOutputFormat;
063import org.kepler.stratosphere.stub.KeplerCoGroupStub;
064import org.kepler.stratosphere.stub.KeplerCombineAndReduceStub;
065import org.kepler.stratosphere.stub.KeplerCrossStub;
066import org.kepler.stratosphere.stub.KeplerMapStub;
067import org.kepler.stratosphere.stub.KeplerMatchStub;
068import org.kepler.stratosphere.stub.KeplerReduceStub;
069import org.kepler.stratosphere.stub.ScriptEngineMapStub;
070import org.kepler.stratosphere.stub.ScriptEngineReduceStub;
071import org.kepler.stratosphere.type.TypeUtilities;
072
073import eu.stratosphere.api.common.Plan;
074import eu.stratosphere.api.common.io.FileInputFormat;
075import eu.stratosphere.api.common.io.InputFormat;
076import eu.stratosphere.api.common.io.OutputFormat;
077import eu.stratosphere.api.common.operators.DualInputOperator;
078import eu.stratosphere.api.common.operators.FileDataSink;
079import eu.stratosphere.api.common.operators.FileDataSource;
080import eu.stratosphere.api.common.operators.GenericDataSink;
081import eu.stratosphere.api.common.operators.GenericDataSource;
082import eu.stratosphere.api.common.operators.Operator;
083import eu.stratosphere.api.common.operators.SingleInputOperator;
084import eu.stratosphere.api.java.record.functions.CoGroupFunction;
085import eu.stratosphere.api.java.record.functions.CrossFunction;
086import eu.stratosphere.api.java.record.functions.JoinFunction;
087import eu.stratosphere.api.java.record.functions.MapFunction;
088import eu.stratosphere.api.java.record.functions.ReduceFunction;
089import eu.stratosphere.api.java.record.io.CsvOutputFormat;
090import eu.stratosphere.api.java.record.io.FileOutputFormat;
091import eu.stratosphere.api.java.record.operators.CoGroupOperator;
092import eu.stratosphere.api.java.record.operators.CrossOperator;
093import eu.stratosphere.api.java.record.operators.JoinOperator;
094import eu.stratosphere.api.java.record.operators.MapOperator;
095import eu.stratosphere.api.java.record.operators.ReduceOperator;
096import eu.stratosphere.client.minicluster.NepheleMiniCluster;
097import eu.stratosphere.client.program.JobWithJars;
098import eu.stratosphere.configuration.ConfigConstants;
099import eu.stratosphere.configuration.Configuration;
100import eu.stratosphere.configuration.GlobalConfiguration;
101import eu.stratosphere.nephele.client.JobExecutionResult;
102import ptolemy.actor.CompositeActor;
103import ptolemy.actor.IOPort;
104import ptolemy.data.BooleanToken;
105import ptolemy.data.Token;
106import ptolemy.data.type.Type;
107import ptolemy.kernel.util.IllegalActionException;
108import ptolemy.kernel.util.NameDuplicationException;
109import ptolemy.kernel.util.NamedObj;
110import ptolemy.kernel.util.Workspace;
111
112/** An engine than runs workflows in Stratosphere. This engine
113 *  converts DDP pattern actors (Map, Reduce, Cross, CoGroup, and
114 *  Match) and I/O actors (DDPDataSink and DDPDataSource) into a
115 *  Stratosphere job and runs it on the server.
116 *  <p>
117 *  <b>NOTE:</b> Only DDP pattern and I/O actors may be present in
118 *  the workflow. Other actors must placed inside the composite
119 *  pattern actors or in a different sub-workflow.
120 *  </p>
121 * 
122 *  @author Daniel Crawl
123 *  @version $Id: StratosphereEngine.java 33628 2015-08-24 22:42:20Z crawl $
124 *
125 */
126public class StratosphereEngine extends DDPEngine {
127
128        /** Create a new StratosphereEngine.
129         *  @param director The director containing this engine.
130         */
131    public StratosphereEngine(DDPDirector director)
132            throws IllegalActionException, NameDuplicationException {
133        super(director);
134        
135        _engineName = STRATOSPHERE_ENGINE_NAME;
136        
137    }
138       
139    /** Clone the StratosphereEngine into the specified workspace.
140     *  @param workspace The workspace for the cloned object.
141     *  @exception CloneNotSupportedException Not thrown in this base class
142     *  @return The new StratosphereDirector.
143     */
144    @Override
145    public Object clone(Workspace workspace) throws CloneNotSupportedException {
146        StratosphereEngine newObject = (StratosphereEngine) super.clone(workspace);
147        newObject._globalConfig = null;
148        newObject._inputsAlreadyDone = new HashSet<Operator>();
149        newObject._job = null;
150        return newObject;
151    }
152
153    /** Execute the engine. 
154     * @return */
155    @Override
156    protected void _executeDDPJob() throws IllegalActionException {
157                
158        if(!_sameJVM && _globalConfig == null) {
159                
160                GlobalConfiguration.loadConfiguration(_configDirStr);
161                _globalConfig = GlobalConfiguration.getConfiguration();
162                
163                // instantiate the address to the job manager
164                final String address = _globalConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
165                if (address == null) {
166                    throw new IllegalActionException(_director,
167                                "Cannot find address to job manager's RPC service in the global configuration.");
168                }
169                
170                final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
171                if (port < 0) {
172                    throw new IllegalActionException(_director,
173                                "Cannot find port to job manager's RPC service in the global configuration.");
174                }
175        
176                final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port);
177                
178                // see if we can connect to stratosphere; start if necessary
179                // this is synchronized so that if there are multiple
180                // StratosphereDirectors in the same workflow, only one will
181                // start the stratosphere server
182            File configDirFile = new File(_configDirStr);
183            String parentDirStr = configDirFile.getParent();
184            String startServerTypeStr = _director.startServerType.stringValue();
185            if(startServerTypeStr.isEmpty() || startServerTypeStr.equals("default")) {
186                startServerTypeStr = DDPDirector.SAME_JVM_STRING;
187            } else if(startServerTypeStr.equals(DDPDirector.DISTRIBUTED_STRING)) {
188                startServerTypeStr = "cluster";
189            }
190            
191            String startScriptStr = parentDirStr + File.separator + "bin" +
192                File.separator + "start-" + startServerTypeStr + ".sh";
193
194            if(_checkServer(jobManagerAddress, startScriptStr)) {               
195                _startedConfigDirs.add(_configDirStr);
196            }
197
198        }
199         
200        // see what jars are necessary. call this before creating the job
201        // since a new classloader be created for external classes.
202        final List<URI> jarURIs = _getJarList();
203        
204        final List<String> jarPaths = new LinkedList<String>();
205        for (URI jarURI : jarURIs) {
206            jarPaths.add(jarURI.getPath());
207        }
208
209        // now that types have been set, we can convert the model into a plan
210        final Plan plan = _getModelPlan();
211
212        if(_sameJVM) {
213            
214            synchronized(_localClusterLock) {
215                if(_localCluster == null) {
216                    _localCluster = new NepheleMiniCluster();
217                    try {
218                        System.out.println("Starting Stratosphere server in Kepler JVM.");
219                        _localCluster.start();
220                    } catch (Exception e) {
221                        throw new IllegalActionException(_director, e,
222                                "Error starting Stratosphere server in Kepler JVM.");
223                    }
224                }
225            }
226
227            System.out.println("Starting Stratosphere job.");
228            
229            synchronized(_jobLock) {
230                try {
231                    _job = new StratosphereJob(new JobWithJars(plan, jarPaths), _localCluster);
232                } catch (Exception  e) {
233                    throw new IllegalActionException(_director, e, "Error running job.");
234                }
235            }
236            
237        } else {
238                        
239            System.out.println("Starting Stratosphere job.");
240            synchronized(_jobLock) {
241                try {
242                    _job = new StratosphereJob(new JobWithJars(plan, jarPaths), _globalConfig);
243                } catch (Exception e) {
244                    throw new IllegalActionException(_director, e,
245                            "Error submitting Stratosphere job.");
246                }
247            }
248        }
249        
250        JobExecutionResult result;
251        try {
252            result = _job.waitForFinish();
253        } catch (Exception e) {
254            throw new IllegalActionException(_director, e, "Error in Stratosphere job.");
255        }
256        
257        if(result != null) {
258            System.out.println("Finished Stratosphere job, took " +
259                result.getNetRuntime() + " ms.");
260        } else {
261            System.out.println("Stratosphere job cancelled.");
262        } 
263    }
264    
265    /** Initialize the engine. */
266    @Override
267    public void preinitialize() throws IllegalActionException {
268     
269        // call super class preinitialize to validate settables and 
270        // preinitialize actors
271        super.preinitialize();
272        
273        _checkModel();
274        _checkModelForReduceWithCombineClass();
275        
276        // load the default config dir if necessary
277        _checkConfigDir();
278        
279    }
280
281    /** Stop any running Stratosphere jobs. */
282    @Override
283    public void stop() throws IllegalActionException {
284        super.stop();
285        
286        synchronized(_jobLock) {
287            if(_job != null) {
288                try {
289                    _job.stop();
290                } catch (IOException e) {
291                    if(_director != null) {
292                        throw new IllegalActionException(_director, e, "Error stopping DDP job.");
293                    } else {
294                        throw new IllegalActionException("Error stopping DDP job: " + e.getMessage());
295                    }
296                }
297            }
298        }
299    }
300
301    /** Shutdown the Stratosphere server if one was started. */
302    public static void shutdownServer() throws IllegalActionException {
303        
304        // shut down the server in this jvm if one is running.
305        synchronized(_localClusterLock) {
306            if(_localCluster != null) {
307                System.out.println("Stopping Local Stratosphere server.");
308    
309                try {
310                    _localCluster.stop();
311                    _localCluster = null;
312                } catch (Exception e) {
313                    throw new IllegalActionException("Error shutting down local Stratosphere server: " +
314                            e.getMessage());
315                }
316            }
317        }
318
319        // shut down any servers we started.
320        synchronized(_serverStartStopLock) {
321                for(String configDirStr : _startedConfigDirs) {
322                    
323                    System.out.println("Stopping Stratosphere server for configuration directory " +
324                            configDirStr);
325                    
326                    String parentDirStr = new File(configDirStr).getParent();
327                    String stopScriptStr = parentDirStr + File.separator + "bin" +
328                        File.separator + "stop-cluster.sh";
329                    ProcessBuilder builder = new ProcessBuilder(stopScriptStr);
330                    
331                    // make sure JAVA_HOME is set
332                    java.util.Map<String,String> env = builder.environment();
333                    if(env.get("JAVA_HOME") == null) {
334                        env.put("JAVA_HOME", System.getProperty("java.home"));
335                    }
336                    
337                    try {
338                        Process process = builder.start();
339                        process.waitFor();
340                    } catch (Exception e) {
341                        throw new IllegalActionException("Unable to stop Stratosphere: " + e.getMessage());
342                    }            
343                }
344        }
345    }
346
347    /** Free resources. */
348    @Override
349    public void wrapup() throws IllegalActionException {
350        
351        super.wrapup();
352        
353        _globalConfig = null;
354        
355        synchronized(_jobLock) {
356            _job = null;
357        }
358    }
359
360    ///////////////////////////////////////////////////////////////////
361    ////                      public fields                      //////
362
363    ///////////////////////////////////////////////////////////////////
364    ////                      protected methods                  //////
365
366    ///////////////////////////////////////////////////////////////////
367    ////                      private methods                    //////
368
369    /** Write the sub-workflow of a SingleInputPatternActor either to a parameter
370     *  in the PACT plan configuration or to a file.
371     */
372    protected void _addSubModelToContract(SingleInputPatternActor pactor, Operator contract) throws IllegalActionException {
373                         
374        final String name = pactor.getName();
375                
376        final Configuration contractConfiguration = contract.getParameters();
377        contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, name);
378
379        contractConfiguration.setBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT,
380                pactor.getRunWorkflowLifecyclePerInput());
381        
382        // make a clone of the pactor so we can add TypeAttributes to ports
383        // and add parameters from the hierarchy
384        
385        Workspace workspaceForClone;
386        if(_sameJVM) {
387            workspaceForClone = new Workspace(name);
388        } else {
389            workspaceForClone = _director.workspace();
390        }
391        
392        SingleInputPatternActor clonePactor;
393        try {
394            clonePactor = (SingleInputPatternActor)pactor.clone(workspaceForClone);
395        } catch (CloneNotSupportedException e) {
396            throw new IllegalActionException(_director, e, "Unable to clone " + name);
397        }
398        
399        // copy the port types to the clone
400        _setPortTypes(clonePactor);
401        
402        // copy the parameters to the clone
403        _copyParameters(pactor, clonePactor);
404        
405        // set display redirect path
406        String directDir = pactor.getDisplayRedirectDir();
407        NamedObj redirectSpecifier = pactor;
408        if(directDir.isEmpty()) {
409            redirectSpecifier = _director;
410            directDir = _displayRedirectDir;
411        }
412        if(!directDir.isEmpty()) {
413            
414            // display redirection when running in same jvm is not supported
415            if(_sameJVM) {
416                throw new IllegalActionException(redirectSpecifier,
417                        "Redirecting display actors is not supported for"
418                        + " Stratosphere server running in the same JVM as Kepler.");
419            }
420            
421            final File file = new File (directDir);
422            if (!file.exists() && !file.mkdirs()) {
423                throw new IllegalActionException(_director, "Could not create directories " + file);
424            }
425            if (!file.isDirectory() || !file.canWrite()) {
426                throw new IllegalActionException(redirectSpecifier,
427                        "Parameter '" + _displayRedirectDir + "' must be a directory and writable.");
428            }
429            contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, directDir);
430        }
431
432        if(_sameJVM) {
433            _subWorkflows.put(name, clonePactor);
434        } else {
435        
436            // remove top-level ports and relations
437                Utilities.removeModelPorts(clonePactor);
438            
439            // serialize the clone pactor
440            
441            if(_writeSubWorkflowsToFiles) {
442                
443                if(_jobDir == null) {
444                    _createJobDirectory();
445                }
446                
447                final String modelPath = _jobDir + name + ".xml";
448                FileWriter writer = null;
449                try {
450                    writer = new FileWriter(modelPath);
451                    clonePactor.exportMoML(writer);
452                } catch(IOException e) {
453                    throw new IllegalActionException(_director, e, "Error writing model to " + modelPath);
454                } finally {
455                    if(writer != null) {
456                        try {
457                            writer.close();
458                        } catch (IOException e) {
459                            throw new IllegalActionException(_director, e, "Error writing model to " + modelPath);
460                        }
461                    }
462                }
463                
464                // add model file to plan configuration
465                contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, modelPath);
466                
467            } else {
468             
469                // write model to a string
470                final StringWriter writer = new StringWriter();
471                try {
472                    clonePactor.exportMoML(writer);
473                } catch (IOException e) {
474                    throw new IllegalActionException(_director, e, "Error serializing model.");
475                }
476                
477                // add string to configuration
478                contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODEL, writer.toString());
479            }            
480            
481            // set the location of the kepler installation directory.
482            // NOTE: this is done so that the stub can initialize kepler and set
483            // the java properties for each module's workflow directory, e.g.:
484            // property("stratosphere.workflowdir")
485            // if the modules directory does not exist on the stub, e.g., the file
486            // system is not shared, then initialization is not done and the stub 
487            // workflow cannot use the module workflow directory properties.
488            contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODULES_DIR,
489                    ProjectLocator.getKeplerModulesDir().getAbsolutePath());
490        
491        }
492    }    
493
494    /** Recursively add inputs for a contract by traversing the model graph.
495     * 
496     *  @param pactor the current pactor in the model.
497     *  @param contract the contract for the current pactor. 
498     */
499    private void _addInputsForContract(DDPPatternActor pactor, Operator contract) throws IllegalActionException {
500        
501        // see if we've already done this contract
502        if(!_inputsAlreadyDone.contains(contract)) {
503        
504            // see if the execution class name or execution code is set
505            final String executionClassName = pactor.getExecutionClassName();
506            final String executionCodeType = pactor.getExecutionCodeType();
507            if(executionClassName.isEmpty() &&
508                    executionCodeType == null &&
509                    (pactor instanceof SingleInputPatternActor)) {
510                // add sub-wf to plan configuration
511                _addSubModelToContract((SingleInputPatternActor)pactor, contract);
512            }
513            
514            // see how many inputs are required
515            int numRequiredInputs;
516            if(contract instanceof DualInputOperator) {
517                numRequiredInputs = 2;
518            } else if((contract instanceof SingleInputOperator) ||
519                (contract instanceof GenericDataSink)) {
520                numRequiredInputs = 1;
521            } else if(contract instanceof GenericDataSource) {
522                numRequiredInputs = 0;
523            } else {
524                throw new IllegalActionException(_director, "Unknown type of contract: " + contract.getClass());
525            }
526            
527            // see if there's at least one input
528            if(numRequiredInputs > 0) {
529                
530                // get the first input port
531                final IOPort inputPort1 = (IOPort) pactor.getPort("in");
532                if(inputPort1 == null) {
533                    throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() +
534                        "is missing input port \"in\".");
535                }
536                
537                _addInputForPactorPort(inputPort1, pactor, contract);
538
539                // see if there's a second input
540                if(numRequiredInputs > 1) {
541                    
542                    final IOPort inputPort2 = (IOPort) pactor.getPort("in2");
543                    if(inputPort2 == null) {
544                        throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() +
545                            "is missing input port \"in2\".");
546                    }
547    
548                    _addInputForPactorPort(inputPort2, pactor, contract);
549                }
550            }
551        }
552    }
553
554    /** Set the input contract for a single input port and traverse the model
555     *  graph to recursively add input contracts.
556     *  
557     *  @param port the input port for 
558     *  @param pactor the current pactor in the model.
559     *  @param contract the contract for the current pactor. 
560     */
561    private void _addInputForPactorPort(IOPort port, DDPPatternActor pactor, Operator contract) 
562        throws IllegalActionException {
563        
564        // get the connected actor
565        final List<?> outputPorts = port.sourcePortList();
566        if(outputPorts.isEmpty()) {
567            throw new IllegalActionException(_director, "DDPPatternActor input port " +
568                    port.getName() + " must be connected.");
569        } else if(outputPorts.size() > 1) {
570            throw new IllegalActionException(_director, "DDPPatternActor input port " +
571                    port.getName() + " may only be connected to one port.");                
572        }
573        
574        final IOPort outputPort1 = (IOPort) outputPorts.get(0);
575        final NamedObj outputNamedObj = outputPort1.getContainer();
576                
577        // FIXME
578        if(!(outputNamedObj instanceof DDPPatternActor)) {
579            throw new IllegalActionException(_director, "Actor " + pactor.getFullName() +
580                " is connected to a non-DDPPatternActor: " + outputNamedObj.getFullName());
581        }
582        
583        final Operator outputContract = _getContract((DDPPatternActor)outputNamedObj);
584        
585        if(contract instanceof SingleInputOperator) {
586            ((SingleInputOperator<?>)contract).setInput(outputContract);
587        } else if(contract instanceof DualInputOperator) {
588            if(port.getName().equals("in")) {
589                ((DualInputOperator<?>)contract).setFirstInput(outputContract);
590            } else if(port.getName().equals("in2")) {
591                ((DualInputOperator<?>)contract).setSecondInput(outputContract);                
592            } else {
593                throw new IllegalActionException(port, "Input port must be named either \"in\" or \"in2\".");
594            }
595        } else { // if (contract instanceof GenericDataSink))
596            ((GenericDataSink)contract).setInput(outputContract);
597        }
598        
599        _inputsAlreadyDone.add(contract);
600        
601        // recursively add the inputs for output pactor
602        _addInputsForContract((DDPPatternActor)outputNamedObj, outputContract);
603    }
604    
605    /** Make sure there no with Reduce pattern actors in this sub-workflow
606     *  that specify a separate combiner class.
607     */
608    private void _checkModelForReduceWithCombineClass() throws IllegalActionException {
609        
610        final CompositeActor container = _container;
611        for(Reduce reduce : container.entityList(Reduce.class)) {
612            if(!reduce.combineExecutionClass.stringValue().trim().isEmpty()) {
613                throw new IllegalActionException(reduce,
614                        "The combiner execution class cannot be specified with " +
615                        "Stratosphere; instead implement the combine() in the" +
616                        "reduce class.");
617            }
618        }
619    }
620
621    /** Get a contract for a source or sink.
622     *  @param actor the Pactor
623     *  @param parameterName the name of the parameter containing the class name.
624     *  @param input if true, get a source contract. if false, get a sink contract.
625     */
626    private Operator _getSourceSinkContract(AtomicPatternActor actor, boolean input) throws IllegalActionException {
627        
628        String className = actor.getFormatClassName(_engineName);
629        if(className == null) {
630            throw new IllegalActionException(_director, "Could not find format class name for " +
631                    " actor " + actor.getName());
632        }
633        
634        java.util.Map<String,String> parametersMap = actor.getParameters();
635        
636        // first check for classes that were in 0.1 and 0.2 WordCount
637        if(className.equals("eu.stratosphere.pact.example.wordcount.WordCount$LineInFormat")) {
638            System.out.println("class WordCount$LineInFormat no longer exists; using TextInputFormat");
639            className = "eu.stratosphere.api.java.record.io.TextInputFormat";
640        } else if(className.equals("eu.stratosphere.pact.common.io.TextInputFormat")) {
641            System.out.println("TextInputFormat moved to package eu.stratosphere.api.java.record.io");
642            className = "eu.stratosphere.api.java.record.io.TextInputFormat";           
643        } else if(className.equals("eu.stratosphere.pact.example.wordcount.WordCount$WordCountOutFormat")) {
644            System.out.println("class WordCount$WordCountOutFormat no longer exists; using CsvOutputFormat");
645            className = "eu.stratosphere.api.java.record.io.CsvOutputFormat";
646            parametersMap.put("recordDelimiter", "\n");
647            parametersMap.put("fieldDelimiter", " ");
648        } else if(className.equals("eu.stratosphere.pact.common.io.RecordOutputFormat")) {
649                System.out.println("class RecordOutputFormat no longer exists; using CsvOutputFormat.");
650            className = "eu.stratosphere.api.java.record.io.CsvOutputFormat";
651            parametersMap.put("recordDelimiter", "\n");
652            parametersMap.put("fieldDelimiter", " ");
653        }
654
655        // try to get the class
656        Class<?> clazz;
657        try {
658            clazz = _classLoader.loadClass(className);
659        } catch (ClassNotFoundException e) {
660            throw new IllegalActionException(actor, "Format type " + className +
661                " was not found in the format types configurations or is a class not on the classpath.");
662        }
663        
664        Operator contract;
665        if(input) {            
666            if(!InputFormat.class.isAssignableFrom(clazz)) {
667                throw new IllegalActionException(actor, "Class " + clazz.getName() +
668                    " must implement Stratosphere's InputFormat interface.");
669            }
670            
671            if(FileInputFormat.class.isAssignableFrom(clazz)) {
672                contract = new FileDataSource((Class<? extends FileInputFormat<?>>) clazz,
673                    ((DDPDataSource) actor).getPathAsURI().toString(), actor.getName());
674            } else if(TokenInputFormat.class.isAssignableFrom(clazz)) {
675
676                contract = new TokenDataSource((Class<? extends TokenInputFormat>)clazz,
677                                actor.getName());
678                
679                if(_sameJVM) {
680                    contract.setParameter(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME,
681                                actor.getFullName());
682                } else {
683                    
684                    Token token = DDPDataSource.getToken(actor.getFullName());
685                    
686                    if(token == null) {
687                        throw new IllegalActionException(actor, 
688                            "No input token found for source actor " + actor.getName());
689                    }
690                    
691                    contract.setParameter(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, token.toString());
692                }
693
694            } else {
695                throw new IllegalActionException(actor, "Unsupported type of format class: " +
696                                clazz.getName());
697            }
698            
699        } else {
700
701            if(!OutputFormat.class.isAssignableFrom(clazz)) {
702                throw new IllegalActionException(actor, "Class " + clazz.getName() +
703                    " must implement Stratosphere's OutputFormat interface.");
704            }
705            
706            if(FileOutputFormat.class.isAssignableFrom(clazz)) {
707                
708                contract = new FileDataSink((Class<? extends FileOutputFormat>) clazz,
709                    ((DDPDataSink)actor).getPathAsURI().toString(), actor.getName());
710            
711                if(clazz == CsvOutputFormat.class) {
712                
713                    CsvOutputFormat.ConfigBuilder builder = 
714                                CsvOutputFormat.configureRecordFormat((FileDataSink) contract)
715                        .lenient(true)
716                        .field(TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(
717                            ((DDPDataSink) actor).in, "key"), TypeUtilities.KEY_FIELD)
718                        .field(TypeUtilities.getPactValueTypeForFieldInKeyValuePort(
719                            ((DDPDataSink) actor).in, "value"), TypeUtilities.VALUE_FIELD);
720                
721                
722                    String field = parametersMap.get("fieldDelimiter");
723                    // if no field delimiter is set, set to space
724                    if(field == null) {
725                        field = " ";
726                    }
727                    builder = builder.fieldDelimiter(field.charAt(0));
728                    String record = parametersMap.get("recordDelimiter");
729                    if(record != null) {
730                        builder = builder.recordDelimiter(record);
731                    }
732                }
733            } else if(clazz == TokenOutputFormat.class) {
734                
735                contract = new GenericDataSink((Class<? extends TokenOutputFormat>) clazz, actor.getName());
736                
737                contract.setParameter(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME,
738                        actor.getFullName());
739            
740                TokenOutputFormat.ConfigBuilder builder = 
741                        TokenOutputFormat.configureRecordFormat((GenericDataSink) contract)
742                    .field(TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(
743                        ((DDPDataSink) actor).in, "key"), TypeUtilities.KEY_FIELD)
744                    .field(TypeUtilities.getPactValueTypeForFieldInKeyValuePort(
745                        ((DDPDataSink) actor).in, "value"), TypeUtilities.VALUE_FIELD);                    
746            } else if(clazz == NullOutputFormat.class) {
747                
748                contract = new GenericDataSink(new NullOutputFormat());
749                
750            } else {
751                throw new IllegalActionException(actor, "Unsupported type of format class: " +
752                        clazz.getName());
753            }
754        }
755        
756        return contract;
757
758    }
759    
760    /** Get the contract for a DDPPatternActor. */
761    private Operator _getContract(DDPPatternActor actor) throws IllegalActionException {
762
763        Operator contract = null;
764        
765        final String actorName = actor.getName();
766        
767        final String stubClassName = actor.getExecutionClassName();
768        Class<?> stubClass = null;
769        
770        // see if the stub was set
771        if(!stubClassName.isEmpty()) {
772            try {
773                stubClass = _classLoader.loadClass(stubClassName);
774            } catch (ClassNotFoundException e) {
775                throw new IllegalActionException(actor, e,
776                        "Could not find execution class " + stubClassName); 
777            }
778        }
779        
780        final String stubCodeTypeName = actor.getExecutionCodeType();
781        if(stubCodeTypeName != null) {
782            
783            // TODO
784            if(!(actor instanceof Map) && !(actor instanceof Reduce)) {
785                throw new IllegalActionException(actor, "code execution not yet supported for this pattern.");
786            }
787        }
788                
789        final boolean stubIsSubWorkflow = (stubClass == null) && (stubCodeTypeName == null);
790        
791        if(actor instanceof DDPDataSource) {
792            contract = _getSourceSinkContract((DDPDataSource)actor, true);            
793        } else if(actor instanceof Map) {
794            
795            if(stubIsSubWorkflow) {
796                // execute sub-workflow as stub
797                stubClass = KeplerMapStub.class;
798            } else if(stubCodeTypeName != null) {
799                stubClass = ScriptEngineMapStub.class;
800            } else if(!MapFunction.class.isAssignableFrom(stubClass)) {
801                throw new IllegalActionException(actor, "Execution class " +
802                        stubClassName + " must be a subclass of " +
803                        MapFunction.class.getName());
804            } else {
805                System.out.println("Using Map execution class " + stubClassName +
806                        " for " + actor.getFullName());
807            }
808                        
809            contract = MapOperator.builder((Class<? extends MapFunction>)stubClass)
810                    .name(actorName)
811                    .build();
812            
813        } else if (actor instanceof Reduce) {
814            
815            if(stubIsSubWorkflow) {
816                // execute sub-workflow as stub
817                
818                // see if reducer is also a combiner
819                if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) {
820                    stubClass = KeplerCombineAndReduceStub.class;
821                } else {          
822                    stubClass = KeplerReduceStub.class;
823                }
824                
825            } else if(stubCodeTypeName != null) {
826                stubClass = ScriptEngineReduceStub.class;
827            } else if(!ReduceFunction.class.isAssignableFrom(stubClass)) {
828                throw new IllegalActionException(actor, "Execution class " +
829                        stubClassName + " must be a subclass of " +
830                        ReduceFunction.class.getName());
831            } else {
832                System.out.println("Using Reduce execution class " + stubClassName +
833                        " for " + actor.getFullName());
834            }
835            
836            contract = ReduceOperator
837                    .builder((Class<? extends ReduceFunction>)stubClass)
838                    .keyField(
839                            TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(
840                                    ((Reduce) actor).in, "key"),
841                            TypeUtilities.KEY_FIELD)
842                    .name(actorName)
843                    .build();
844
845        } else if(actor instanceof Cross) {
846            
847            // see if execution class was set
848            if(stubClass == null) {
849                // execute sub-workflow as stub
850                stubClass = KeplerCrossStub.class;
851            } else if(!CrossFunction.class.isAssignableFrom(stubClass)) {
852                throw new IllegalActionException(actor, "Execution class " +
853                        stubClassName + " must be a subclass of " +
854                        CrossFunction.class.getName());
855            } else {
856                System.out.println("Using Cross execution class " + stubClassName +
857                        " for " + actor.getFullName());
858            }
859
860            contract = CrossOperator.builder((Class<? extends CrossFunction>)stubClass)
861                    .name(actorName)
862                    .build();
863
864        } else if(actor instanceof CoGroup) {
865            
866            // see if execution class was set
867            if(stubClass == null) {
868                // execute sub-workflow as stub
869                stubClass = KeplerCoGroupStub.class;
870            } else if(!CoGroupFunction.class.isAssignableFrom(stubClass)) {
871                throw new IllegalActionException(actor, "Execution class " +
872                        stubClassName + " must be a subclass of " +
873                        CoGroupFunction.class.getName());
874            } else {
875                System.out.println("Using CoGroup execution class " + stubClassName +
876                        " for " + actor.getFullName());
877            }
878
879            contract = CoGroupOperator.builder((Class<? extends CoGroupFunction>) stubClass,
880                        TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(((CoGroup)actor).in, "key"),
881                        TypeUtilities.KEY_FIELD, TypeUtilities.KEY_FIELD)
882                    .name(actorName)
883                    .build();
884
885        } else if(actor instanceof Match) {
886            
887            // see if execution class was set
888            if(stubClass == null) {
889                // execute sub-workflow as stub
890                stubClass = KeplerMatchStub.class;
891            } else if(!JoinFunction.class.isAssignableFrom(stubClass)) {
892                throw new IllegalActionException(actor, "Execution class " +
893                        stubClassName + " must be a subclass of " +
894                        JoinFunction.class.getName());
895            } else {
896                System.out.println("Using Match execution class " + stubClassName +
897                        " for " + actor.getFullName());
898            }
899
900            contract = JoinOperator.builder((Class<? extends JoinFunction>) stubClass,
901                        TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(((Match)actor).in, "key"),
902                        TypeUtilities.KEY_FIELD, TypeUtilities.KEY_FIELD)
903                    .name(actorName)
904                    .build();
905            
906        } else if(actor instanceof DDPDataSink) {
907            contract = _getSourceSinkContract((DDPDataSink)actor, false);                        
908        } else {
909            throw new IllegalActionException(_director, "Cannot determine Contract type for " +
910                    actor.getFullName());
911        }
912        
913        // set the number of parallel instances.
914        int numInstances = actor.getDegreeOfParallelism();
915        if(numInstances <= 0) {
916            numInstances = _degreeOfParallelism;
917        }
918        contract.setDegreeOfParallelism(numInstances);
919        
920        // set any job arguments if an execution class is used (not a sub-workflow)
921        final Configuration configuration = contract.getParameters();
922        if(!stubIsSubWorkflow) {
923            for(java.util.Map.Entry<String,String> entry : _jobArgumentsMap.entrySet()) {
924                // XXX this assumes they are all strings.
925                configuration.setString(entry.getKey(), entry.getValue());
926            }
927        }
928
929        // add any actor parameters to the contract configuration
930        final java.util.Map<String,String> contractParameters = actor.getParameters();
931        final java.util.Map<String,String> paraNames = actor.getParaImplNames(_engineName);
932        for(java.util.Map.Entry<String, String> entry : contractParameters.entrySet()) {
933            String keplerParaName = entry.getKey();
934            if (paraNames.get(keplerParaName) != null) {
935                configuration.setString(paraNames.get(keplerParaName), entry.getValue());
936            } else {
937                configuration.setString(Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::" + keplerParaName, entry.getValue());
938            }
939        }
940        //set print execution info parameter
941        configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, actor.getPrintExeInfo());
942
943        // set the same JVM flag.
944        if(_sameJVM) {
945            configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, true);        
946        }
947
948        if(stubCodeTypeName != null) {
949                        
950            String scriptEngineFactoryName = Utilities.getScriptEngineFactoryName(stubCodeTypeName);
951            if(scriptEngineFactoryName == null) {
952                throw new IllegalActionException(actor,
953                        "No script engine has been configured for " + stubCodeTypeName);
954            }
955            
956            if(!_sameJVM) {
957                List<String> jars = Utilities.getJarsForLanguage(stubCodeTypeName);
958                _additionalJars.addAll(jars);
959            }
960            
961            configuration.setString(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, scriptEngineFactoryName);
962            
963            configuration.setString(Utilities.CONFIGURATION_KEPLER_STUB_CODE, actor.getExecutionCode());
964            
965            String inTypesStr = ((SingleInputPatternActor)actor).inKeyValueTypes.stringValue();
966            Type[] types = Types.getKeyValueTypes(((SingleInputPatternActor)actor).inKeyValueTypes, inTypesStr);
967     
968            configuration.setString(Utilities.CONFIGURATION_KEPLER_INPUT_KEY_TYPE, types[0].toString());
969            configuration.setString(Utilities.CONFIGURATION_KEPLER_INPUT_VALUE_TYPE,  types[1].toString());
970            
971        }
972
973        
974        return contract;
975    }
976    
977    /** Constructs a PACT Plan from the model. */
978    protected Plan _getModelPlan() throws IllegalActionException {
979     
980        _inputsAlreadyDone.clear();
981
982        Plan plan = null;
983        
984        // find all the sinks in the model
985        final List<DDPDataSink> sinks = _container.entityList(DDPDataSink.class);
986        
987        if(sinks.isEmpty()) {
988            throw new IllegalActionException(_director, "No data sinks found.");
989        }
990        
991        // for each sink, traverse the graph to the source, adding to the plan
992        for(DDPDataSink sink : sinks) {
993            
994            // get the PACT contract for this sink
995            GenericDataSink contract = (GenericDataSink)_getContract(sink);
996            
997            // add sink to the plan
998            if(plan == null) {
999                plan = new Plan(contract, "Kepler PACT Job " + _container.getName());
1000            } else {
1001                plan.addDataSink(contract);
1002            }
1003            
1004            // traverse graph for this sink
1005            _addInputsForContract(sink, contract);
1006        }
1007                
1008        return plan;
1009    }
1010    
1011    ///////////////////////////////////////////////////////////////////
1012    ////                      private fields                     //////
1013
1014    /** Nephele global configuration. */
1015    private Configuration _globalConfig;
1016    
1017    private Set<Operator> _inputsAlreadyDone = new HashSet<Operator>();
1018                        
1019        /**
1020         * A list of configuration directories for each stratosphere server that is
1021         * started.
1022         */
1023        private final static Set<String> _startedConfigDirs = Collections
1024                        .synchronizedSet(new HashSet<String>());
1025        
1026        /** Logging. */
1027    //private final static Log _log = LogFactory.getLog(StratosphereEngine.class);
1028    
1029        /** The name of the stratosphere engine. */
1030    private final static String STRATOSPHERE_ENGINE_NAME = "Stratosphere";
1031    
1032    /** Cluster to execute stratosphere jobs in the Kepler JVM. */
1033    private static NepheleMiniCluster _localCluster;
1034    
1035    /** A lock for _localCluster. */
1036    private final static Object _localClusterLock = new Object();
1037    
1038    /** A Stratosphere job that can be cancelled. */
1039    private StratosphereJob _job;
1040}