001/* A DDP engine than runs models in Hadoop.
002 * 
003 * Copyright (c) 2012-2013 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * 
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027package org.kepler.hadoop.director;
028
029import java.io.BufferedReader;
030import java.io.File;
031import java.io.FileReader;
032import java.io.FileWriter;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.InputStreamReader;
036import java.io.StringWriter;
037import java.net.InetSocketAddress;
038import java.net.MalformedURLException;
039import java.net.Socket;
040import java.net.URI;
041import java.net.URL;
042import java.net.URLClassLoader;
043import java.util.ArrayList;
044import java.util.Arrays;
045import java.util.Collection;
046import java.util.Collections;
047import java.util.HashMap;
048import java.util.LinkedList;
049import java.util.List;
050import java.util.Random;
051
052import org.apache.commons.io.FileUtils;
053import org.apache.commons.logging.Log;
054import org.apache.commons.logging.LogFactory;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.FileUtil;
058import org.apache.hadoop.fs.FsShell;
059import org.apache.hadoop.fs.Path;
060import org.apache.hadoop.io.RawComparator;
061import org.apache.hadoop.mapreduce.InputFormat;
062import org.apache.hadoop.mapreduce.Job;
063import org.apache.hadoop.mapreduce.Mapper;
064import org.apache.hadoop.mapreduce.OutputFormat;
065import org.apache.hadoop.mapreduce.Partitioner;
066import org.apache.hadoop.mapreduce.Reducer;
067import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
068import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
069import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
070import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
071import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
072import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
073import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
074import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
075import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
076import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
077import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
078import org.kepler.build.project.ProjectLocator;
079import org.kepler.ddp.Utilities;
080import org.kepler.ddp.actor.pattern.AtomicPatternActor;
081import org.kepler.ddp.actor.pattern.CoGroup;
082import org.kepler.ddp.actor.pattern.Cross;
083import org.kepler.ddp.actor.pattern.DDPDataSink;
084import org.kepler.ddp.actor.pattern.DDPDataSource;
085import org.kepler.ddp.actor.pattern.DDPPatternActor;
086import org.kepler.ddp.actor.pattern.DualInputPatternActor;
087import org.kepler.ddp.actor.pattern.Map;
088import org.kepler.ddp.actor.pattern.Match;
089import org.kepler.ddp.actor.pattern.Reduce;
090import org.kepler.ddp.actor.pattern.SingleInputPatternActor;
091import org.kepler.ddp.director.DDPDirector;
092import org.kepler.ddp.director.DDPEngine;
093import org.kepler.hadoop.io.input.TokenInputFormat;
094import org.kepler.hadoop.io.output.TokenOutputFormat;
095import org.kepler.hadoop.mapreduce.Combiner4TagValue;
096import org.kepler.hadoop.mapreduce.MapReduce4Kepler;
097import org.kepler.hadoop.mapreduce.Mapper4Cross;
098import org.kepler.hadoop.mapreduce.Mapper4DataTransform;
099import org.kepler.hadoop.mapreduce.Mapper4TagValue;
100import org.kepler.hadoop.mapreduce.Reducer4CoGroup;
101import org.kepler.hadoop.mapreduce.Reducer4Match;
102import org.kepler.hadoop.util.DDPPatternActorUtil;
103import org.kepler.hadoop.util.StubUtilities;
104
105import ptolemy.actor.Actor;
106import ptolemy.actor.IOPort;
107import ptolemy.actor.TypeAttribute;
108import ptolemy.actor.TypedIOPort;
109import ptolemy.data.BooleanToken;
110import ptolemy.data.RecordToken;
111import ptolemy.data.StringToken;
112import ptolemy.data.Token;
113import ptolemy.data.expr.Parameter;
114import ptolemy.data.type.ArrayType;
115import ptolemy.data.type.BaseType;
116import ptolemy.data.type.RecordType;
117import ptolemy.data.type.Type;
118import ptolemy.kernel.util.IllegalActionException;
119import ptolemy.kernel.util.InternalErrorException;
120import ptolemy.kernel.util.NameDuplicationException;
121import ptolemy.kernel.util.NamedObj;
122import ptolemy.kernel.util.Workspace;
123
124/** An engine than runs workflows in Hadoop. This engine
125 *  converts DDP pattern actors (Map, Reduce, Cross, CoGroup, and
126 *  Match) and I/O actors (DDPDataSink and DDPDataSource) into a
127 *  Hadoop job and runs it on the server.
128 *  <p>
129 *  <b>NOTE:</b> Only DDP pattern and I/O actors may be present in
130 *  the workflow. Other actors must placed inside the composite
131 *  pattern actors or in a different sub-workflow.
132 *  </p>
133 * 
134 * @author Jianwu Wang
135 * @version $Id: HadoopEngine.java 33628 2015-08-24 22:42:20Z crawl $
136 * 
137 */
138public class HadoopEngine extends DDPEngine {
139        
140    public static final String HDFS_NAME = "fs.default.name";
141    public static final String FS_NAME = "fs.defaultFS";
142    public static final String RES_MANAGER_URL = "0.0.0.0:8032";
143    public static final String CORE_SITE = "core-site.xml";
144    public static final String MAPRED_SITE = "mapred-site.xml";
145    public static final String YARN_SITE = "yarn-site.xml";
146    public static final String MAP_CHILD_ENV = "mapred.map.child.env";
147    public static final String REDUCE_CHILD_ENV = "mapred.reduce.child.env";
148    public static final String MR_JOBTRACKER_ADDRESS = "mapreduce.jobtracker.address";
149    public static final String MR_FRAMEWORK_NAME = "mapreduce.framework.name";
150    public static final String YARN_RM_ADDRESS = "yarn.resourcemanager.address";
151        
152        public HadoopEngine(DDPDirector director) throws IllegalActionException,
153                        NameDuplicationException {
154                super(director);
155                
156                // load the configuration file
157                //_configProp = ConfigurationManager.getInstance().getProperty(
158                                //ConfigurationManager.getModule("ddp-common"));
159
160                _engineName = HADOOP_ENGINE_NAME;
161                
162        }
163        
164    /** Clone the HadoopEngine into the specified workspace.
165     *  @param workspace The workspace for the cloned object.
166     *  @exception CloneNotSupportedException Not thrown in this base class
167     *  @return The new HadoopDirector.
168     */
169    @Override
170    public Object clone(Workspace workspace) throws CloneNotSupportedException {
171        HadoopEngine newObject = (HadoopEngine) super.clone(workspace);
172        newObject._autoHDFSStage = false;
173        newObject._jobControl = null;
174        newObject._overwriteOutput = false;
175        newObject._stageOutDirMap = null;
176        newObject._tmpDir = null;
177        newObject._tokenOutputFileMap = new HashMap<Path,DDPDataSink>();
178        newObject.conf = null;
179        //newObject._configProp = null;
180        return newObject;
181    }
182
183    @Override
184    protected void _executeDDPJob() throws IllegalActionException {
185                        
186                conf = new Configuration();
187                conf.addResource(new Path(_configDirStr + File.separator
188                                + CORE_SITE));
189                conf.addResource(new Path(_configDirStr + File.separator
190                                + MAPRED_SITE));
191                conf.addResource(new Path(_configDirStr + File.separator
192                                + YARN_SITE));          
193                //set environment variables for child processes.
194                conf.set(MAP_CHILD_ENV, "PATH="+System.getenv("PATH") + ",LD_LIBRARY_PATH="+System.getenv("LD_LIBRARY_PATH"));
195                conf.set(REDUCE_CHILD_ENV, "PATH="+System.getenv("PATH") + ",LD_LIBRARY_PATH="+System.getenv("LD_LIBRARY_PATH"));
196                //try to solve user.home error for child process. But the following two lines are not the correct solution.
197                //conf.set("mapred.child.java.opts", conf.get("mapred.child.java.opts") + " -Duser.home=" + System.getProperty("user.home"));
198                //conf.set("mapred.map.child.java.opts", conf.get("mapred.map.child.java.opts") + " -Duser.home=" + System.getProperty("user.home"));
199        conf.setBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, _sameJVM);        
200
201                if(_sameJVM) {
202                    //conf.set("mapred.job.tracker", "local");
203                    conf.set(MR_JOBTRACKER_ADDRESS, "local");
204                    conf.set(MR_FRAMEWORK_NAME, "local");
205                    conf.set(FS_NAME, "file:///");
206                    System.out.println("Running Hadoop server in Kepler JVM.");
207                    
208            if(_degreeOfParallelism > 1) {
209                System.err.println("WARNING: the default degree of parallelism for " +
210                        _director.getName() + " is " + _degreeOfParallelism +
211                        ", but Hadoop only uses 1 thread in sameJVM mode.");
212            }
213
214                } else {
215                        System.out.println("Checking Hadoop server in distributed mode.");
216                        String hdfsURL = conf.get(FS_NAME);
217                        _log.info("value of " + FS_NAME + " : " + hdfsURL);
218                        if (hdfsURL == null) {
219                                hdfsURL = conf.get(HDFS_NAME);
220                                if (hdfsURL == null)
221                                        throw new IllegalActionException(_director,
222                                                "both 'fs.default.name' and 'fs.defaultFS' of Hadoop properties are null.");
223                        }
224        
225                        String resManagerURL = conf.get(YARN_RM_ADDRESS);
226                        if (resManagerURL == null) {
227                                //use default value for it
228                                resManagerURL = RES_MANAGER_URL;
229                        }
230                        _log.info("value of " + YARN_RM_ADDRESS + " : " + resManagerURL);
231        
232                        String[] hdfsInfo = hdfsURL.split(":");
233                        String[] jobInfo = resManagerURL.split(":");
234        
235                        final InetSocketAddress hdfsAddress = new InetSocketAddress(
236                                        hdfsInfo[1].substring(2), new Integer(hdfsInfo[2]).intValue());
237                        
238                        final InetSocketAddress resManagerAddress = new InetSocketAddress(
239                                        jobInfo[0], new Integer(jobInfo[1]).intValue());
240        
241                        // see if we can connect to Hadoop; start if necessary
242                        synchronized (this) {
243                                _checkServer(hdfsAddress, resManagerAddress);
244                        }
245                }
246                
247                _jobDir = null;
248                
249                synchronized(_jobLock) {
250                _jobControl = new JobControl(_director.getFullName());
251                }
252                
253                URL[] jarArray;
254                try{
255                final List<URI> jarURIs = this._getJarList();
256                List<URL> jarURLs = new LinkedList<URL>();
257                for (URI jarURI : jarURIs) {
258                        jarURLs.add(jarURI.toURL());
259                }
260//              jarURLs.add(new File("").toURI().toURL());
261        
262                jarArray = jarURLs.toArray(new URL[jarURLs.size()]);
263                        
264                } catch (MalformedURLException e) {
265                        e.printStackTrace();
266                        throw new IllegalActionException(_director, e.getMessage());
267                }
268
269                //remove output path if needed
270                _overwriteOutput = ((BooleanToken) overwriteOutput.getToken()).booleanValue();          
271                _autoHDFSStage = ((BooleanToken) autoHDFSStage.getToken()).booleanValue();
272
273                final List<?> sinkActors = _container.entityList(DDPDataSink.class);
274
275                try {
276                        //create a tmp directory for each execution if HDFS is used
277                        if(_sameJVM)
278                                _tmpDir = _getARadomLocalDir();
279                        else
280                                _tmpDir = _getARadomHDFSDir(null, true);
281                        for (Object object : sinkActors) {
282                                final DDPDataSink sink = (DDPDataSink) object;
283                                Collection<ControlledJob> jobs = _getHadoopJobs(null, null, sink, null, false, false, null, false);
284                                synchronized(_jobLock) {
285                                    _jobControl.addJobCollection(jobs);
286                                }
287                        }                       
288                } catch (IOException e) {
289                        e.printStackTrace();
290                        throw new IllegalActionException(_director, e,
291                                        "Error when creating Hadoop job chains.");
292                } catch (ClassNotFoundException e) {
293                        e.printStackTrace();
294                        throw new IllegalActionException(_director, "Class: " + e.getMessage() + 
295                                        " cannot be found. Please make sure its jar is included.");
296                }
297
298        if(!_sameJVM) {
299                synchronized(_jobLock) {
300                        for (ControlledJob waitingJob : _jobControl.getWaitingJobList()) {
301                                _log.info("job:" + waitingJob);
302                            _setJars(waitingJob.getJob().getConfiguration(), jarArray);
303                        }    
304                }
305        }
306                
307        // create a thread to monitor the job control thread.
308        // when the jobs are all finished, it stops the thread.
309        Thread monitorThread = new Thread(new Runnable() {
310            @Override
311            public void run() {
312                while (true) {
313                    synchronized(_jobLock) {
314                        if(_jobControl == null) {
315                            return;
316                        }
317                        if(_jobControl.allFinished()) {
318                            _jobControl.stop();
319                            return;
320                        }
321                        try {
322                            _jobLock.wait(3000);
323                        } catch (InterruptedException e) {
324                            e.printStackTrace();
325                        }
326                    }
327                }
328            }
329        });
330        monitorThread.start();      
331
332        // run the job control thread
333                Thread jobRunner = new Thread(new Runnable() {
334                    @Override
335            public void run() {
336                        _jobControl.run();
337                    }
338                });
339                System.out.println("Submitting Hadoop job.");
340                jobRunner.start();
341                
342                try {
343            jobRunner.join();
344        } catch (InterruptedException e) {
345            throw new IllegalActionException(_director, e, "Error waiting for job.");
346        }
347                
348                /*
349                while(jobRunner.isAlive()) {
350                    try {
351                synchronized(_jobLock) {
352                    if(!_stopJob.get()) {
353                        _jobLock.wait(3000);
354                    }
355                }
356            } catch (InterruptedException e) {
357                throw new IllegalActionException(_director, e, "Error waiting for job.");
358            }
359                }
360                */
361                
362                System.out.println("Hadoop job finished.");
363                
364                
365                synchronized(_jobLock) {
366                _log.info("All hadoop jobs are finished.");
367                String[] failedJobArray = new String[_jobControl.getFailedJobList()
368                                .size()];
369                int i = 0;
370                for (ControlledJob job : _jobControl.getFailedJobList()) {
371                        _log.info("failed job:" + job);
372                        failedJobArray[i] = job.getMessage();
373                        i++;
374                }
375                                
376                if (i > 0) {
377                        String message = Arrays.toString(failedJobArray);
378                        System.err.println(message);
379                        throw new IllegalActionException(_director,
380                                "Some jobs failed during execution: " + message.substring(1, message.length()-1) + " You may have to check Hadoop job logs to know the reason of failure.");
381                }
382                
383            _jobControl = null;
384                }
385
386                
387                //stage data out
388                try {
389                        if (_stageOutDirMap != null){
390                                for (String hdfsPath : _stageOutDirMap.keySet()){
391                                        _stageOut(hdfsPath, _stageOutDirMap.get(hdfsPath));
392                                }
393                        }
394                } catch (IOException e) {
395                        
396                        e.printStackTrace();
397                        throw new IllegalActionException(_director, e,
398                                        "Error when staging data out from HDFS.");                      
399                } finally {
400                        _stageOutDirMap = null;
401                }
402
403                // see if there were sinks using token output, and we not using same jvm
404                if(!_tokenOutputFileMap.isEmpty()) {
405                    _transferTokenOutputsFromHDFS();
406                }                                       
407        }
408
409        // FIXME only run once
410    @Override
411        public boolean postfire() throws IllegalActionException {       
412                //remove the tmp path at HDFS
413                try {
414                        boolean _rmTmpHDFSDir = ((BooleanToken) removeTmpHDFSDir
415                                        .getToken()).booleanValue();
416                        if (_rmTmpHDFSDir)
417                                _removeOutput(_tmpDir, true);
418                } catch (IOException e) {
419                        
420                        e.printStackTrace();
421                        throw new IllegalActionException(_director, e,
422                                        "Error when removing tmp data at HDFS.");
423                }
424                fs = null; // reset fs.
425                return super.postfire();
426        }
427
428    @Override
429        public void preinitialize() throws IllegalActionException {
430
431                // call super class preinitialize to validate settables and
432                // preinitialize actors
433                super.preinitialize();
434        _checkModel();
435
436                // load the configuration and create a new client
437                if (_configDirStr.trim().isEmpty()) {
438                        // set the default location of the config directory
439                        String workflowDirStr = System.getProperty("hadoop.workflowdir");
440                        if (workflowDirStr == null) {
441                                throw new InternalErrorException(
442                                                "System property hadoop.workflowdir not set.");
443                        }
444                        _configDirStr = workflowDirStr + File.separator + "tools"
445                                        + File.separator + "etc" + File.separator + "hadoop"; //update conf dir for hadoop 2.0
446                }
447
448                // make sure conf dir exists
449                final File configDir = new File(_configDirStr);
450                if (!configDir.exists()) {
451                        throw new IllegalActionException(_director,
452                                        "Hadoop configuration directory " + _configDirStr
453                                                        + " does not exist.");
454                }
455                
456                //re-set variables
457                _stageOutDirMap = null;
458                _tmpDir = null;
459                _tokenOutputFileMap.clear();
460
461        }
462
463    /** Stop any running Hadoop jobs. */
464    @Override
465    public void stop() throws IllegalActionException {
466        super.stop();
467        
468        //System.out.println("stop");
469        synchronized(_jobLock) {
470            //System.out.println("stop got lock");
471            if(_jobControl != null) {
472                System.out.println("Stopping submitted jobs.");
473                _jobControl.stop();
474                List<ControlledJob> jobs = _jobControl.getReadyJobsList();
475                jobs.addAll(_jobControl.getWaitingJobList());
476                jobs.addAll(_jobControl.getRunningJobList());
477                for(ControlledJob job : jobs) {
478                    try {
479                        job.failJob("Job stopped by user");
480                    } catch (Exception e) {
481                        throw new IllegalActionException(_director, e, "Error stopping Hadoop job.");
482                    }
483                }
484                _jobLock.notifyAll();
485            }
486        }
487    }
488    
489        /** Shutdown the Hadoop server if one was started. */
490        public static void shutdownServer() throws IllegalActionException {
491                
492//              if(_sameJVM) //Hadoop Server is not started for the same JVM setting, so no shutdown is needed.
493//                      return;
494
495                for (String configDirStr : _startedConfigDirs) {
496
497                        System.out
498                                        .println("Stopping Hadoop server for configuration directory "
499                                                        + configDirStr);
500
501                        String hadoopDirStr = new File(configDirStr).getParentFile().getParent();
502                        String stopScriptStr = hadoopDirStr + File.separator + "bin"
503                                        + File.separator + "stop-hadoop.sh";
504                        ProcessBuilder stopBuilder = new ProcessBuilder(stopScriptStr);
505//                      String stopMRScriptStr = parentDirStr + File.separator + "bin"
506//                                      + File.separator + "stop-mapred.sh";
507//                      ProcessBuilder stopMRBuilder = new ProcessBuilder(stopMRScriptStr);
508
509                         // make sure JAVA_HOME is set
510                         java.util.Map<String,String> env = stopBuilder.environment();
511                         if(env.get("JAVA_HOME") == null) {
512                                 env.put("JAVA_HOME", System.getProperty("java.home"));
513                         }
514                         if (env.get("HADOOP_INSTALL") == null) {
515                                env.put("HADOOP_INSTALL", hadoopDirStr);
516                         }
517                         if (env.get("HADOOP_COMMON_HOME") == null) {
518                                env.put("HADOOP_COMMON_HOME", hadoopDirStr);
519                         }
520                         if (env.get("HADOOP_MAPRED_HOME") == null) {
521                                env.put("HADOOP_MAPRED_HOME", hadoopDirStr);
522                         }
523                         if (env.get("HADOOP_HDFS_HOME") == null) {
524                                env.put("HADOOP_HDFS_HOME", hadoopDirStr);
525                         }
526                         if (env.get("YARN_HOME") == null) {
527                                env.put("YARN_HOME", hadoopDirStr);
528                         }
529                        try {
530                                Process process = stopBuilder.start();
531                                process.waitFor();
532//                              process = stopMRBuilder.start();
533//                              process.waitFor();
534                        } catch (Exception e) {
535                                throw new IllegalActionException("Unable to stop Hadoop: "
536                                                + e.getMessage());
537                        }
538                }
539        }
540        
541    /** Free resources. */
542        @Override
543        public void wrapup() throws IllegalActionException {
544            super.wrapup();
545            fs = null;
546        synchronized(_jobLock) {
547            _jobControl = null;
548        }
549        }
550        
551        /** Add parameters to the containing director. */
552    @Override
553    protected void _addParameters() throws IllegalActionException, NameDuplicationException {
554
555        overwriteOutput = (Parameter) _director.getAttribute("overwriteOutput");
556        if(overwriteOutput == null) {
557                        overwriteOutput = new Parameter(_director, "overwriteOutput");
558                        overwriteOutput.setTypeEquals(BaseType.BOOLEAN);
559                        overwriteOutput.setToken(BooleanToken.TRUE);
560        }
561        
562        removeTmpHDFSDir = (Parameter) _director.getAttribute("removeTmpHDFSDir");
563        if(removeTmpHDFSDir == null) {
564                removeTmpHDFSDir = new Parameter(_director,     "removeTmpHDFSDir");
565                removeTmpHDFSDir.setTypeEquals(BaseType.BOOLEAN);
566                removeTmpHDFSDir.setToken(BooleanToken.TRUE);                   
567        }
568        
569        autoHDFSStage = (Parameter) _director.getAttribute("autoHDFSStage");
570        if(autoHDFSStage == null) {
571                autoHDFSStage = new Parameter(_director,        "autoHDFSStage");
572                autoHDFSStage.setTypeEquals(BaseType.BOOLEAN);
573                autoHDFSStage.setToken(BooleanToken.FALSE);             
574        }
575    }
576    
577    /** Remove engine-specific parameters from the director. */
578    @Override
579    protected void _removeParameters() throws IllegalActionException, NameDuplicationException {
580        overwriteOutput.setContainer(null);
581        removeTmpHDFSDir.setContainer(null);
582        autoHDFSStage.setContainer(null);
583    }
584    
585    /** Set the port types inside a cloned pattern actor.
586    *  TODO: merge this with _setPortTypes() in the parent class. 
587    *  
588    *  @param actor the cloned actor
589    */
590        @Override
591        protected java.util.Map<String, Type> _setPortTypes(DDPPatternActor actor)
592                throws IllegalActionException {
593                java.util.Map<String, Type> typeMap = new HashMap<String, Type>();
594                
595                // set the types of the ports of the input/output actors via
596                // TypeAttributes
597                final List<?> pactorPorts = actor.portList();
598                for (Object object : pactorPorts) {
599                        final TypedIOPort pactorPort = (TypedIOPort) object;
600                        _log.debug("pactorPort:" + pactorPort);
601                
602                        // get connected ports
603                        final List<?> connectedPorts = pactorPort.insidePortList();
604                        for (Object object2 : connectedPorts) {
605                                final TypedIOPort connectedPort = (TypedIOPort) object2;
606                                _log.debug("connectedPort:" + connectedPort);
607                
608                                // set the types of ports connected to the pactor port so their
609                                // types can be used to set the pact types in the stub
610                                TypeAttribute typeAttribute;
611                                try {
612                                        typeAttribute = new TypeAttribute(connectedPort,
613                                                        connectedPort.getName() + "Type");
614                                } catch (NameDuplicationException e) {
615                                        throw new IllegalActionException(_director, e,
616                                                        "Error creating type attribute for "
617                                                                        + connectedPort.getFullName());
618                                }
619                                typeAttribute.setExpression(connectedPort.getType().toString());
620                                _log.debug("set connectedPort " + connectedPort + "'type to be:" + connectedPort.getType());
621                
622                                // get the ports of the input/output actor
623                                List<?> inputOutputActorPorts;
624                                if (connectedPort.isInput()) {
625                                        inputOutputActorPorts = ((Actor) connectedPort
626                                                        .getContainer()).outputPortList();
627                                } else {
628                                        inputOutputActorPorts = ((Actor) connectedPort
629                                                        .getContainer()).inputPortList();
630                                }
631                
632                                // set the types for the input/output actor ports
633                                for (Object object3 : inputOutputActorPorts) {
634                                        final TypedIOPort inputOutputActorPort = (TypedIOPort) object3;
635                                        _log.debug("inputOutputActorPort:" + inputOutputActorPort);
636                                        TypeAttribute existingAtt = (TypeAttribute)inputOutputActorPort.getAttribute(inputOutputActorPort.getName() + "Type");
637                                        _log.debug("type attribute of inputOutputActorPort:" + existingAtt);
638                                        if (existingAtt == null) {
639                                                try {
640                                                        typeAttribute = new TypeAttribute(inputOutputActorPort,
641                                                                        inputOutputActorPort.getName() + "Type");
642                                                } catch (NameDuplicationException e) {
643                                                        throw new IllegalActionException(_director, e,
644                                                                        "Error creating type attribute for "
645                                                                                        + inputOutputActorPort.getFullName());
646                                                }
647                                                _log.debug("in actor: "+ actor.getName() + ", its port name:" + inputOutputActorPort.getName() + " , port type:" + inputOutputActorPort.getType());
648                                                typeMap.put(inputOutputActorPort.getName(), inputOutputActorPort.getType());
649                                                typeAttribute.setExpression(inputOutputActorPort.getType()
650                                                                .toString());
651                                        }
652                                }
653                
654                        }
655                
656                }
657                
658                return typeMap;
659        }
660
661        
662        
663        /** Check if Hadoop server is running. If not, try to start it. */
664        private void _checkServer(InetSocketAddress hdfsAddress, InetSocketAddress jobManagerAddress)
665                        throws IllegalActionException {
666                Socket socket1 = new Socket();
667                Socket socket2 = new Socket();
668                boolean hdfsConnected = false;
669                boolean jobManagerConnected = false;
670                try {
671                        socket1.connect(hdfsAddress, _CONNECT_TIMEOUT);
672                        hdfsConnected = true;
673                        socket2.connect(jobManagerAddress, _CONNECT_TIMEOUT);
674                        jobManagerConnected = true;
675                        
676                } catch (IOException e) {
677                        if (!hdfsConnected)
678                                System.out
679                                        .println("Hadoop HDFS server: " + hdfsAddress + " does not appear to be running.");
680                        if (!jobManagerConnected)
681                                System.out
682                                        .println("Hadoop Resource Manager server: " + jobManagerAddress + " does not appear to be running.");
683                        System.out
684                                .println("Try to start hadoop with configuration directory:" + _configDirStr);  
685                        // start hadoop
686                        File configDir = new File(_configDirStr);
687                        String hadoopDirStr = configDir.getParentFile().getParent();
688                        //format node could delete all data in HDFS. It is too dangerous to use.
689                        String startScriptStr = hadoopDirStr + File.separator + "bin"
690                                        + File.separator + "start-hadoop.sh";
691                        
692
693                        // see if the script is executable. kepler modules are zipped,
694            // which does not preserve the permissions.
695            File startScriptFile = new File(startScriptStr);
696            if(!startScriptFile.exists()) {
697                throw new IllegalActionException(_director, 
698                        "The script " + startScriptFile + " does not exist.\n" +
699                                "Give up on automatic Hadoop starting. \n" +
700                                        "You have to start Hadoop manually.");
701            }
702            if(!startScriptFile.canExecute()) {
703                throw new IllegalActionException(_director, 
704                        "The script " + startScriptFile + " is not executable.\n" +
705                                "You must change the permissions so that " +
706                                startScriptFile.getName() + 
707                                " and all the other scripts in \n" +
708                                startScriptFile.getParent() + " are executable.");
709            }
710
711                        ProcessBuilder startBuilder = new ProcessBuilder(startScriptStr);
712
713                        // make sure HADOOP_COMMON_HOME is set
714                        java.util.Map<String, String> env = startBuilder.environment();
715                        if (env.get("JAVA_HOME") == null) {
716                                env.put("JAVA_HOME", System.getProperty("java.home"));
717                        }
718                        if (env.get("HADOOP_INSTALL") == null) {
719                                env.put("HADOOP_INSTALL", hadoopDirStr);
720                        }
721                        if (env.get("HADOOP_COMMON_HOME") == null) {
722                                env.put("HADOOP_COMMON_HOME", hadoopDirStr);
723                        }
724                        if (env.get("HADOOP_MAPRED_HOME") == null) {
725                                env.put("HADOOP_MAPRED_HOME", hadoopDirStr);
726                        }
727                        if (env.get("HADOOP_HDFS_HOME") == null) {
728                                env.put("HADOOP_HDFS_HOME", hadoopDirStr);
729                        }
730                        if (env.get("YARN_HOME") == null) {
731                                env.put("YARN_HOME", hadoopDirStr);
732                        }
733                        int exitCode;
734                        StringBuilder errString = new StringBuilder();
735                        try {
736                                Process process = startBuilder.start();
737                                exitCode = process.waitFor();
738                                _log.debug("exitCode : " + exitCode);
739                                
740                                InputStream stderr = process.getErrorStream();
741                                InputStream stdout = process.getInputStream();
742
743                                BufferedReader stdReader = new BufferedReader (new InputStreamReader(stdout));
744                                BufferedReader errReader = new BufferedReader (new InputStreamReader(stderr));
745
746                                String line;
747
748                                while ((line = stdReader.readLine ()) != null) {
749                                        _log.debug ("Stdout: " + line);
750                                        System.out.println("Stdout: " + line);
751                                }               
752
753                                while ((line = errReader.readLine ()) != null) {
754                                        _log.debug ("Stderr: " + line);
755                                        errString.append(line).append("\n");
756                                }
757                                
758                                stdReader.close();
759                                errReader.close();
760                        } catch (Exception e1) {
761                                throw new IllegalActionException(_director, e1,
762                                                "Unable to start Hadoop.");
763                        }
764                        
765                        if (exitCode != 0) {
766                                throw new IllegalActionException(_director,
767                                                "Error when start Hadoop:" + errString);
768                        }
769
770                        _startedConfigDirs.add(_configDirStr);
771
772                        int tries = 0;
773                        while (tries < 5) {
774                                // wait for the server to start
775                                try {
776                                        Thread.sleep(5000);
777                                        tries++;
778                                        System.out.println("Connecting to Hadoop server port try #"
779                                                        + tries);
780                                        try {
781                                                if (!jobManagerConnected) {
782                                                        socket1 = new Socket();
783                                                        socket1.connect(jobManagerAddress, _CONNECT_TIMEOUT);
784                                                        jobManagerConnected = true;
785                                                } 
786                                                if (!hdfsConnected) {
787                                                        socket2 = new Socket();
788                                                        socket2.connect(hdfsAddress, _CONNECT_TIMEOUT);
789                                                        hdfsConnected = true;
790                                                }
791                                                break;
792                                        } catch (IOException e1) {
793                                                //e1.printStackTrace();
794                                        }
795                                } catch (InterruptedException e2) {
796                                        throw new IllegalActionException(_director, e2,
797                                                        "Error while sleeping.");
798                                }
799                        }
800
801                        // if we get here, we were able to connect to the hadoop job
802                        // manager port. however, hadoop may not be completely
803                        // initialized, so wait a few more seconds
804                        System.out
805                                        .println("Waiting 5 seconds for Hadoop server to initialize.");
806                        try {
807                                Thread.sleep(5000);
808                        } catch (InterruptedException e2) {
809                                throw new IllegalActionException(_director, e2,
810                                                "Error while waiting "
811                                                                + " for Hadoop server to initialize.");
812                        }
813
814                } finally {
815                        try {
816                                if (jobManagerConnected)
817                                        socket1.close();
818                                if (hdfsConnected)
819                                        socket2.close();
820                        } catch (IOException e) {
821                                throw new IllegalActionException(_director, e,
822                                                "Error closing socket.");
823                        }
824                }
825
826                if (!jobManagerConnected)
827                        throw new IllegalActionException(_director,
828                                "Could not connect to Hadoop server: " + jobManagerAddress);
829                else if (!hdfsConnected)
830                        throw new IllegalActionException(_director,
831                                        "Could not connect to HDFS server: " + "hdfs://" + hdfsAddress);
832        }
833
834        /**
835         * Clone and export actor.
836         * 
837         * @throws IllegalActionException
838         * @throws IOException 
839         * @throws ClassNotFoundException 
840         */
841        private DDPPatternActorUtil _cloneAndExport(SingleInputPatternActor actor)
842                        throws IllegalActionException, IOException, ClassNotFoundException {
843                // export actor for non-IO pactors.
844                SingleInputPatternActor clonePactor;
845                final Workspace workspace;
846
847                if(_sameJVM) {
848                    workspace = new Workspace(actor.getName());
849        } else {
850            workspace = _director.workspace();
851        }
852
853                try {
854                        clonePactor = (SingleInputPatternActor) actor.clone(workspace);
855                        java.util.Map<String, Type> portTypeMap = _setPortTypes(clonePactor);
856                        _copyParameters(actor, clonePactor);
857                        if (!actor.getExecutionClassName().equalsIgnoreCase("")) {
858                                DDPPatternActorUtil actorUtil = new DDPPatternActorUtil(clonePactor, clonePactor.getName(), actor.getExecutionClassName(), clonePactor.inKeyValueTypes.stringValue(), clonePactor.outKeyValueTypes.stringValue());
859                                return actorUtil;
860                        } else {
861                            
862                            String actorString = null;
863                        if(_sameJVM) {
864                            _subWorkflows.put(actor.getName(), clonePactor);
865                        } else {
866                                Utilities.removeModelPorts(clonePactor);
867                                if (_writeSubWorkflowsToFiles)
868                                        _exportSubWorkflow(clonePactor);
869                                // write model to a string
870                                final StringWriter writer = new StringWriter();
871                                try {
872                                        clonePactor.exportMoML(writer);
873                                } catch (IOException e) {
874                                        throw new IllegalActionException(_director, e,
875                                                        "Error serializing model.");
876                                }
877                                actorString = writer.toString();
878                        }
879                        
880                                conf.setBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT,
881                                        clonePactor.getRunWorkflowLifecyclePerInput());
882                                
883                                DDPPatternActorUtil actorUtil = new DDPPatternActorUtil(clonePactor, clonePactor.getName(), actorString, portTypeMap);
884                                return actorUtil;
885                        }
886                } catch (CloneNotSupportedException e) {
887                        throw new IllegalActionException(_director, e, "Unable to clone "
888                                        + actor.getName());
889                }
890        }
891
892        /**
893         * Get the class specified in a Pactor's parameter.
894         * 
895         * @actor the Pactor
896         * @parameterName the name of the parameter containing the class name.
897         */
898        private Class<?> _getClassFromParameter(DDPPatternActor actor,
899                        String parameterName) throws IllegalActionException {
900
901                Parameter parameter = (Parameter) ((DDPPatternActor) actor)
902                                .getAttribute(parameterName);
903                if (parameter == null) {
904                        throw new IllegalActionException(actor, "Missing parameter "
905                                        + parameterName + ".");
906                }
907
908                // make sure it's not empty.
909                String className = ((StringToken) parameter.getToken()).stringValue();
910                if (className.trim().isEmpty()) {
911                        throw new IllegalActionException(actor,
912                                        "Missing value for parameter " + parameterName + ".");
913                }
914
915                // try to get the class
916                Class<?> clazz;
917                try {
918                        clazz = Class.forName(className);
919                } catch (ClassNotFoundException e) {
920                        throw new IllegalActionException(actor, "Class " + className
921                                        + " is not on the classpath.");
922                }
923                
924                return clazz;
925        }
926
927        /** Get the hadoop Job for a PACTor. 
928         * @param container The container.
929     * @param hasSpecialMapper Whether a mapper is added for Match/CoGrooup/Cross.
930         * @throws ClassNotFoundException 
931         * @throws IllegalActionException 
932         * @throws IOException 
933         */
934        private Collection<ControlledJob> _getHadoopJobs(
935                        Collection<ControlledJob> jobList, ControlledJob job,
936                        DDPPatternActor actor, List<DDPPatternActorUtil> actorList,
937                        boolean hasReducer, boolean hasSpecialMapper, String tmpDir, 
938                        boolean tag) throws IOException, IllegalActionException, ClassNotFoundException {
939
940                if (actor instanceof DDPDataSink) {
941                        String formatClassName = null;
942                        ControlledJob controlJob = null;
943                        Collection<ControlledJob> ctrJobList = null;
944                        try {
945                                //Job currentJob = new Job(this.conf);
946                                Job currentJob = Job.getInstance(this.conf);
947//                              _classLoader = _loadActorJars(actor);
948                                formatClassName = ((DDPDataSink) actor).getFormatClassName(_engineName);
949                                //formatClassName = _getFormatClassName((DDPDataSink) actor,
950                                        //"formatType", false);
951                                Class<?> clazz = _classLoader.loadClass(formatClassName)
952                        .asSubclass(OutputFormat.class);
953                                currentJob.setOutputFormatClass((Class<? extends OutputFormat>) clazz);
954//                              currentJob.setOutputFormatClass(Class.forName(formatClassName)
955//                                              .asSubclass(OutputFormat.class));
956                                
957                                if(TokenOutputFormat.class.isAssignableFrom(clazz)) {
958                                    currentJob.getConfiguration().set(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME,
959                                actor.getFullName());
960                                    
961                                    // see if we're in distributed mode
962                                    if(!_sameJVM) {
963                                        
964                                        // TokenOutputFormat cannot access DDPDataSink since it's in a
965                                        // different JVM. instead, we use TextOutputFormat to write the
966                                        // results to HDFS, and once the job is finished, copy the results
967                                        // to DDPDataSink (see _transferTokenOutputsFromHDFS()).
968                                        
969                                        // create a temporary directory in HDFS for the token outputs
970                        String tokenOutputDir = _getARadomHDFSDir(_tmpDir, false);
971                        Path tokenOutputPath = new Path(tokenOutputDir + "/token-output.txt");
972                        _tokenOutputFileMap.put(tokenOutputPath, (DDPDataSink)actor);
973                            FileOutputFormat.setOutputPath(currentJob, tokenOutputPath);
974                            //System.out.println("Setting token output to be " + _tokenOutputFilePath);
975                            
976                            // use TextOutputFormat instead of TokenOutputFormat
977                            currentJob.setOutputFormatClass(TextOutputFormat.class);                        
978                                    }
979                                    
980                                } else if(clazz != NullOutputFormat.class) {
981                                String outputPath = _getParaValue((DDPDataSink) actor,
982                                                "path");
983                                _log.info("output path:" + outputPath);
984                                _verifyParaSetting(outputPath, actor);
985                                if (outputPath.substring(0, 4).equalsIgnoreCase("hdfs")) {
986                                        //if HDFS is used for sameJVM, throw an exception.
987                                        if (_sameJVM) {
988                                                throw new IllegalActionException(
989                                                                _director,
990                                                                "the path " + outputPath + " in actor "
991                                                                                + actor.getName()
992                                                                                + " cannot be HDFS url when sameJVM is chosen for parameter 'startServerType' in director " + _director.getName() + ".");
993                                        }
994                                        if (_overwriteOutput){
995//                                              _removeOutput(outputPath, false);
996                                                _removeOutputDir(outputPath, actor);
997                                        }
998                                        
999                                        
1000    //                          } else if (new File(outputPath).exists() || new File(outputPath).mkdirs()){
1001                                } else {//data is in local file system.
1002                                        System.out.println("outputPath:" + outputPath);
1003//                                      System.out.println("File.separator:" + File.separator);
1004                                        if (outputPath.substring(0, 8).equalsIgnoreCase("file:///"))
1005                                                outputPath = outputPath.substring(7);
1006                                        if (new File(outputPath).exists() && _overwriteOutput){
1007                                                if (new File(outputPath).isDirectory()) {
1008                                                        _removeOutputDir(outputPath, actor);
1009                                                }
1010                                                else if (!new File(outputPath).delete())
1011                                                        throw new IllegalActionException(
1012                                                        _director,
1013                                                        "the path " + outputPath + " in actor "
1014                                                                        + actor.getName() + " cannot be deleted. Please check file system." );
1015                                        } 
1016                                        if (_autoHDFSStage || _sameJVM) {//the outputs should be staged out from HDFS to local file system. If _sameJVM is true, always stage results to local file system to be consistent with other DDP engines.
1017                                                if (_stageOutDirMap == null)
1018                                                        _stageOutDirMap = new HashMap<String, String>();
1019                                                String randomOutPath = _getARadomHDFSDir(_tmpDir, false);
1020                                                _stageOutDirMap.put(randomOutPath, outputPath);
1021                                                outputPath = randomOutPath;
1022                                        } else {
1023                                                if (File.separator.equalsIgnoreCase("/")) //linux and mac
1024                                                        outputPath = "file://" + outputPath; //add file prefix back
1025                                                else {
1026                                                        System.out.println("In windows, outputPath:" + outputPath);
1027                                                        outputPath = "file:///" + outputPath;
1028                                                }
1029                                        }
1030                                }
1031    //                                  //do nothing because we will use local file as output directly.
1032    //                                  //the output path is local file, data will staged out and outputpath is set to be a random hdfs path.
1033    //                                  if (_stageOutDirMap == null)
1034    //                                          _stageOutDirMap = new HashMap<String, String>();
1035    //                                  String randomOutPath = _getARadomHDFSDir(_tmpDir, false);
1036    ////                                        _stageOutDirMap.put(randomOutPath, outputPath);
1037    ////                                        if (_overwriteOutput){
1038    ////                                                new File(outputPath).delete();
1039    ////                                        }
1040    ////                                        outputPath = randomOutPath;
1041    ////                                } else if (!new File(outputPath).getParentFile().exists() && !(new File(outputPath).mkdirs())) {
1042    //                          } else {
1043    //                                  throw new IllegalActionException(
1044    //                                                  _director,
1045    //                                                  "the path " + outputPath + " in actor "
1046    //                                                                  + actor.getName()
1047    //                                                                  + " does not exist and cannot be created. Maybe its file system is not supported yet.");
1048    //                          }
1049
1050                    FileOutputFormat.setOutputPath(currentJob, new Path(outputPath));
1051
1052                                }
1053                                _addActorParamsToJob(actor, currentJob.getConfiguration());
1054                                controlJob = new ControlledJob(currentJob, null);
1055                                ctrJobList = new LinkedList<ControlledJob>();
1056                                ctrJobList.add(controlJob);
1057                        } catch (ClassCastException e) {
1058                                e.printStackTrace();
1059                                throw (new IllegalActionException("The class name: "
1060                                                + formatClassName
1061                                                + " is not sub class of OutputFormat, please check it."));
1062                        } catch (ClassNotFoundException e) {
1063                                e.printStackTrace();
1064                                throw (new IllegalActionException(
1065                                                "No proper class can be found based on the name: "
1066                                                                + formatClassName + " , please check it."));
1067                        }
1068                        List<DDPPatternActor> upStreamActorList = this
1069                                        ._getUpStreamActorList(actor);
1070
1071                        return _getHadoopJobs(ctrJobList, controlJob,
1072                                        upStreamActorList.get(0), null, false, false, null, false);
1073
1074                } else if (actor instanceof DDPDataSource) {
1075                        if (actorList != null) {
1076                                //add map actor based on its reverse order
1077                                
1078                                for (int i = actorList.size()-1; i >= 0; i--) {
1079                                        DDPPatternActorUtil actorUtil = actorList.get(i);
1080                                        java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1081                                        //                                      _classLoader = _loadActorJars(actor);
1082                                        Class impClass = null;
1083                                        String impClassName = actorUtil.getImpClassName();
1084                                        _setPartitonerClass(job.getJob(), actorUtil.getActor());
1085                                        if (impClassName != null) { //using implement class rather than sub-workflow
1086                                                impClass = _classLoader.loadClass(impClassName);
1087                                                if (!Mapper.class.isAssignableFrom(impClass)) {
1088                                                                throw new IllegalActionException(actor,
1089                                                                                "Execution class " +
1090                                                                                                impClassName + " must be a subclass of " +
1091                                                                                                Mapper.class.getName());
1092                                                }
1093                                                _addDirectorParamsToJob(conf);
1094                                                ChainMapper.addMapper(job.getJob(),
1095                                                                impClass,
1096                                                                StubUtilities.convertToHadoopType(typeMap.get("inKey")), 
1097                                                                StubUtilities.convertToHadoopType(typeMap.get("inValue")),
1098                                                                StubUtilities.convertToHadoopType(typeMap.get("outKey")),
1099                                                                StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf);
1100                                                
1101                                                if (i==0 && !hasReducer) { //if there is no reducer, we should set it to be default reducer based on the output data type of the last mapper.
1102                                                        final Class keyClass = StubUtilities.convertToHadoopType(typeMap.get("outKey"));
1103                                                        final Class valueClass = StubUtilities.convertToHadoopType(typeMap.get("outValue"));
1104                                                        ChainReducer.setReducer(job.getJob(),
1105                                                                        Reducer.class,
1106                                                                        keyClass, valueClass,
1107                                                                        keyClass, valueClass, conf);
1108                                                        setReduceNumber(actor, job.getJob());
1109                                                }
1110                                        } else {
1111                                                try {
1112                                                        _log.info("Working on actor: " + actorUtil.getName());
1113                                                        conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1114                                                        if(!_sameJVM) {
1115                                                            conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1116                                                        }
1117                                                        _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration());
1118                                                        ChainMapper.addMapper(job.getJob(),
1119                                                                        MapReduce4Kepler.Mapper4Kepler.class,
1120                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")), 
1121                                                                        StubUtilities.convertToHadoopType(typeMap.get("value")),
1122                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1123                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf);
1124//                                                      if (i == actorList.size()-1 && tmpDir != null) { //set the output of the last map.
1125//                                                              job.getJob().setOutputFormatClass(SequenceFileOutputFormat.class);
1126//                                                              FileOutputFormat.setOutputPath(job.getJob(), new Path(tmpDir));
1127//                                                      }
1128                                                        if (i==0 && !hasReducer) { //if there is no reducer, we should set it to be default reducer based on the output data type of the last mapper.
1129                                                                final Class<?> keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key");
1130                                                                final Class<?> valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value");
1131                                                                ChainReducer.setReducer(job.getJob(),
1132                                                                                Reducer.class,
1133                                                                                keyClass, valueClass,
1134                                                                                keyClass, valueClass, conf);
1135                                                                setReduceNumber(actor, job.getJob());
1136                                                        }
1137                                                } catch (IOException ioe) {
1138                                                        ioe.printStackTrace();
1139                                                        throw new IllegalActionException(_director, ioe, "Error when transforming actor:" + actorUtil.getName() + " into Hadoop job.");
1140                                                } catch (IllegalArgumentException iae) {
1141                                                        iae.printStackTrace();
1142                                                        throw new IllegalActionException(_director, iae, "Error when transforming actor:" + actorUtil.getName() + " into Hadoop job.");
1143                                                }
1144                                        }
1145                                }
1146                                
1147                        } else if (!hasSpecialMapper){ //actor List is empty and there is no mapper in this job. Then add an identical mapper based on type of 'out' port of DDPSource. 
1148                                _log.debug("processing with no mapper cases for the job connected with DDPSource");
1149                                ArrayType outType = (ArrayType)((TypedIOPort)actor.getPort("out")).getType();
1150                                Class keyClass = StubUtilities.convertRecordArrayToHadoopType(outType, "key");
1151                                Class valueClass = StubUtilities.convertRecordArrayToHadoopType(outType, "value");
1152//                              job.getJob().setMapOutputKeyClass(keyClass);
1153//                              job.getJob().setMapOutputValueClass(valueClass);
1154//                              job.getJob().setMapperClass(Mapper.class);
1155                                ChainMapper.addMapper(job.getJob(),
1156                                        Mapper.class,
1157                                        keyClass, valueClass,
1158                                        keyClass, valueClass, conf);
1159//                              job.getJob().setOutputKeyClass(keyClass);
1160//                              job.getJob().setOutputValueClass(valueClass);
1161                                if(!hasReducer) { //if there is no reducer, we should set it to be default reducer based on the output data type of the last mapper.
1162                                        ChainReducer.setReducer(job.getJob(),
1163                                                        Reducer.class,
1164                                                        keyClass, valueClass,
1165                                                        keyClass, valueClass, conf);
1166                                }
1167                                setReduceNumber(actor, job.getJob());
1168                        }
1169
1170                        String formatClassName = null;
1171                        try {
1172                            
1173                                formatClassName = ((DDPDataSource) actor).getFormatClassName(_engineName);
1174                                
1175                        // try to get the class
1176                        Class<?> clazz;
1177                        try {
1178                            clazz = _classLoader.loadClass(formatClassName);
1179                        } catch (ClassNotFoundException e) {
1180                            throw new IllegalActionException(actor, "Format type " + formatClassName +
1181                                " was not found in the format types configurations or is a class not on the classpath.");
1182                        }
1183
1184                                //formatClassName = _getFormatClassName((DDPDataSource) actor,
1185                                        //"formatType", true);
1186//                              job.getJob().setInputFormatClass(
1187//                                              Class.forName(formatClassName).asSubclass(
1188//                                                              InputFormat.class));
1189                                if(TokenInputFormat.class.isAssignableFrom(clazz)) {
1190                                    job.getJob().setInputFormatClass((Class<? extends InputFormat>) clazz);
1191                                    
1192                        if(_sameJVM) {
1193                            job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME,
1194                                actor.getFullName());
1195                        } else {
1196                            
1197                            Token token = DDPDataSource.getToken(actor.getFullName());
1198                            
1199                            if(token == null) {
1200                                throw new IllegalActionException(actor, 
1201                                    "No input token found for source actor " + actor.getName());
1202                            }
1203                            
1204                            job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, token.toString());
1205                        }
1206
1207                                } else {
1208                                    String inputPath = _getParaValue((DDPDataSource) actor,
1209                                                "path");
1210                                _verifyParaSetting(inputPath, actor);
1211                                _log.debug("input path:" + inputPath);
1212                                        if (!_sameJVM) {
1213                                                if (!inputPath.startsWith("hdfs://")) {
1214                                                        if (inputPath.substring(0, 8).equalsIgnoreCase(
1215                                                                        "file:///"))
1216                                                                inputPath = inputPath.substring(7);
1217                                                        if (!new File(inputPath).exists())
1218                                                                new IllegalActionException(_director,
1219                                                                                "the path " + inputPath + " in actor "
1220                                                                                                + actor.getName()
1221                                                                                                + " does not exist.");
1222                                                        if (_autoHDFSStage) {
1223                                                                // the input path is local file. then stage data
1224                                                                // in and set the hdfs path as inputPath.
1225                                                                inputPath = this._stageIn(_tmpDir, inputPath);
1226                                                                // } else {
1227                                                                // new IllegalActionException(
1228                                                                // _director,
1229                                                                // "the path " + inputPath + " in actor "
1230                                                                // + actor.getName()
1231                                                                // +
1232                                                                // " is not correct or its file system is not supported yet");
1233                                                        } else {
1234                                                                // add file prefix back
1235                                                                if (File.separator.equalsIgnoreCase("/")) // linux
1236                                                                                                                                                        // and
1237                                                                                                                                                        // mac
1238                                                                        inputPath = "file://" + inputPath;
1239                                                                else {
1240                                                                        System.out.println("In windows, inputPath:"
1241                                                                                        + inputPath);
1242                                                                        inputPath = "file:///" + inputPath;
1243                                                                        ;
1244                                                                }
1245                                                        }
1246                                                }
1247                                        } else {
1248                                                // if HDFS is used for sameJVM, throw an exception.
1249                                                if (inputPath.substring(0, 4).equalsIgnoreCase("hdfs")) {
1250                                                        new IllegalActionException(
1251                                                                        _director,
1252                                                                        "the path "
1253                                                                                        + inputPath
1254                                                                                        + " in actor "
1255                                                                                        + actor.getName()
1256                                                                                        + " cannot be HDFS url when sameJVM is chosen for parameter 'startServerType' in director "
1257                                                                                        + _director.getName() + ".");
1258                                                }
1259                                        }
1260    //                          FileInputFormat
1261    //                                          .setInputPaths(job.getJob(), new Path(inputPath));
1262                                //                              _classLoader = _loadActorJars(actor);
1263                                MultipleInputs.addInputPath(job.getJob(), new Path(inputPath), _classLoader.loadClass(formatClassName).asSubclass(
1264                                InputFormat.class));
1265                                if (tag)
1266                                        job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_TAG, inputPath);
1267    //                          MultipleInputs.addInputPath(job.getJob(), new Path(inputPath), Class.forName(formatClassName).asSubclass(
1268    //                                          InputFormat.class));
1269                                }
1270
1271                _addActorParamsToJob(actor, job.getJob().getConfiguration());
1272
1273                                job.getJob().setJobName(actor.getName());
1274                        } catch (ClassCastException e) {
1275                                e.printStackTrace();
1276                                throw (new IllegalActionException("The class name: "
1277                                                + formatClassName
1278                                                + " is not sub class of OutputFormat, please check it."));
1279                        } catch (ClassNotFoundException e) {
1280                                e.printStackTrace();
1281                                throw (new IllegalActionException(
1282                                                "No proper class can be found based on the name: "
1283                                                                + formatClassName + " , please check it."));
1284                        }
1285
1286                        return jobList;
1287                } else {
1288
1289                        if (actor instanceof Map) {
1290                                if (actorList == null)
1291                                        actorList = new LinkedList<DDPPatternActorUtil>();
1292                                try {
1293                                        actorList.add(_cloneAndExport((Map)actor));
1294                                } catch (ClassNotFoundException e) {
1295                                        throw new IllegalActionException(actor, e,
1296                                                        "Execution class " +
1297                                                                        actor.getExecutionClassName() + " is not found.");
1298                                }
1299                                return _getHadoopJobs(jobList, job,
1300                                                _getUpStreamActorList(actor).get(0), actorList,
1301                                                hasReducer, hasSpecialMapper, null, false);
1302
1303                        } else if (actor instanceof Reduce) {
1304                                DDPPatternActorUtil actorUtil;
1305                                try {
1306                                        actorUtil = _cloneAndExport((Reduce)actor);
1307                                } catch (ClassNotFoundException e) {
1308                                        throw new IllegalActionException(actor, e,
1309                                                        "Execution class " +
1310                                                                        actor.getExecutionClassName() + " is not found.");
1311                                }
1312                                // already have reducer in this job
1313                                if (hasReducer) {
1314                                        if (actorList != null) {
1315                                                for (int i = actorList.size()-1; i >= 0; i--) {
1316                                                        actorUtil = actorList.get(i);
1317                                                        //                                                      _classLoader = _loadActorJars(actor);
1318                                                        Class impClass = null;
1319                                                        String impClassName = actorUtil.getImpClassName();
1320                                                        final java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1321                                                        _setPartitonerClass(job.getJob(), actorUtil.getActor());
1322                                                        if (impClassName != null) { //using implement class rather than sub-workflow
1323                                                                impClass = _classLoader.loadClass(impClassName);
1324                                                                if (!Mapper.class.isAssignableFrom(impClass)) {
1325                                                                                throw new IllegalActionException(actor,
1326                                                                                                "Execution class " +
1327                                                                                                                impClassName + " must be a subclass of " +
1328                                                                                                                Mapper.class.getName());
1329                                                                }
1330                                                                _addDirectorParamsToJob(conf);
1331                                                                ChainMapper.addMapper(job.getJob(),
1332                                                                                impClass,
1333                                                                                StubUtilities.convertToHadoopType(typeMap.get("inKey")), 
1334                                                                                StubUtilities.convertToHadoopType(typeMap.get("inValue")),
1335                                                                                StubUtilities.convertToHadoopType(typeMap.get("outKey")),
1336                                                                                StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf);
1337                                                                                
1338                                                        } else {
1339                                                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1340                                    if(!_sameJVM) {
1341                                        conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1342                                    }
1343                                                                _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration());
1344                                                                ChainMapper.addMapper(job.getJob(),
1345                                                                                MapReduce4Kepler.Mapper4Kepler.class,
1346                                                                                StubUtilities.convertToHadoopType(typeMap.get("key")), 
1347                                                                                StubUtilities.convertToHadoopType(typeMap.get("value")),
1348                                                                                StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1349                                                                                StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf);
1350                                                        }
1351                                                        //special input set for the first map.
1352                                                        if (i == actorList.size()-1) {
1353                                                                job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1354                                                                tmpDir = _getARadomHDFSDir(_tmpDir, false);
1355                                                                FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1356                                                        }
1357                                                }
1358                                        } else { // actor List is empty. add an identical mapper based on upstream reducer output key value.
1359                                                //                                      _classLoader = _loadActorJars(actor);
1360                                                Class<?> impClass = null;
1361                                                String impClassName = actorUtil.getImpClassName();
1362                                                final java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1363                                                final Class<?> keyClass, valueClass;
1364                                                if (impClassName != null) { //using implement class rather than sub-workflow, set the key value type based on the reducer's output port type.
1365                                                        keyClass = StubUtilities.convertToHadoopType(typeMap.get("outKey"));
1366                                                        valueClass = StubUtilities.convertToHadoopType(typeMap.get("outValue"));
1367                                                }
1368                                                else {
1369                                                        keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key");
1370                                                        valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value");
1371                                                }
1372                                                ChainMapper.addMapper(job.getJob(),
1373                                                                Mapper.class,
1374                                                                keyClass, valueClass,
1375                                                                keyClass, valueClass, conf);
1376                                                //set input for the job.
1377                                                job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1378                                                tmpDir = _getARadomHDFSDir(_tmpDir, false);
1379                                                FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1380                                        }
1381                                        job.getJob().setJobName(actor.getName());
1382                                        String formatClassName = null;
1383                                        ControlledJob controlJob = null;
1384                                        Job newJob = new Job(this.conf);
1385                                        _addActorParamsToJob((Reduce)actor, newJob.getConfiguration());
1386                                        newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1387                                        controlJob = new ControlledJob(newJob, null);
1388                                        job.addDependingJob(controlJob);
1389                                        jobList.add(controlJob);
1390                                        return _getHadoopJobs(jobList, controlJob, actor, null,
1391                                                        false, hasSpecialMapper, tmpDir, false);
1392
1393                                } else { // no reducer in this job
1394                                                job.getJob().setJobName(actor.getName());
1395                                                //                      _classLoader = _loadActorJars(actor);
1396                                                java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1397                                                Class impClass = null;
1398                                                String impClassName = actorUtil.getImpClassName();
1399                                final String groupingClassName = ((Reduce)actor).groupingComparatorClass.stringValue();
1400                                if (!groupingClassName.isEmpty()) {
1401                                        final Class groupingClass = _classLoader.loadClass(groupingClassName);
1402                                                        if (!RawComparator.class.isAssignableFrom(groupingClass)) {
1403                                                                throw new IllegalActionException(actor,
1404                                                                                "GroupingComparator class " +
1405                                                                                                groupingClass + " must be a subclass of " +
1406                                                                                                RawComparator.class.getName());
1407                                                        }
1408                                        job.getJob().setGroupingComparatorClass(groupingClass);
1409                                }
1410                                                if (impClassName != null) { //using implement class rather than sub-workflow
1411                                                        impClass = _classLoader.loadClass(impClassName);                                                        
1412                                                        if (!Reducer.class.isAssignableFrom(impClass)) {
1413                                                                        throw new IllegalActionException(actor,
1414                                                                                        "Execution class " +
1415                                                                                                        actor.getExecutionClassName() + " must be a subclass of " +
1416                                                                                                        Reducer.class.getName());
1417                                                        }
1418                                                        _addDirectorParamsToJob(conf);
1419                                                        ChainReducer.setReducer(job.getJob(),
1420                                                                        impClass,
1421                                                                        StubUtilities.convertToHadoopType(typeMap.get("inKey")), 
1422                                                                        StubUtilities.convertToHadoopType(typeMap.get("inValue")),
1423                                                                        StubUtilities.convertToHadoopType(typeMap.get("outKey")),
1424                                                                        StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf);
1425                                                        setReduceNumber(actor, job.getJob());
1426                                                        // see if reducer is also a combiner
1427                                        if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) {
1428                                                final String combinerClassName = ((Reduce)actor).combineExecutionClass.stringValue();
1429                                                if (combinerClassName.isEmpty()) 
1430                                                        job.getJob().setCombinerClass(impClass);
1431                                                else {
1432                                                        final Class combineClass = _classLoader.loadClass(combinerClassName);
1433                                                                        if (!Reducer.class.isAssignableFrom(combineClass)) {
1434                                                                                throw new IllegalActionException(actor,
1435                                                                                                "Combiner class " +
1436                                                                                                                combinerClassName + " must be a subclass of " +
1437                                                                                                                Reducer.class.getName());
1438                                                                        }
1439                                                        job.getJob().setCombinerClass(combineClass);
1440                                                }
1441                                        }
1442                                                } else {
1443        //                                              DDPPatternActorUtil actorUtil = _cloneAndExport((Reduce)actor);
1444                                                        conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1445                            if(!_sameJVM) {
1446                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1447                            }
1448                                                        _addActorParamsToJob((Reduce)actor, job.getJob().getConfiguration());
1449                                                        ChainReducer.setReducer(job.getJob(),
1450                                                                        MapReduce4Kepler.Reducer4Kepler.class,
1451                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")), 
1452                                                                        StubUtilities.convertToHadoopType(typeMap.get("values")),
1453                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1454                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf);
1455                                                        setReduceNumber(actor, job.getJob());
1456                                                        // see if reducer is also a combiner
1457                                        if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) {
1458                                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1459                                                if(!_sameJVM) {
1460                                                    job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1461                                                }
1462                                                job.getJob().setCombinerClass(MapReduce4Kepler.Reducer4Kepler.class);
1463                                        }
1464                                                }
1465                                                if (actorList != null) {
1466                                                        for (int i = actorList.size()-1; i >= 0; i--) {
1467                                                                actorUtil = actorList.get(i);
1468                                                                typeMap = actorUtil.getTypeMap();
1469                                                                //                                                              _classLoader = _loadActorJars(actor);
1470                                                                impClassName = actorUtil.getImpClassName();
1471                                                                _setPartitonerClass(job.getJob(), actorUtil.getActor());
1472                                                                if (impClassName != null) { //using implement class rather than sub-workflow
1473                                                                        impClass = _classLoader.loadClass(impClassName);
1474                                                                        if (!Mapper.class.isAssignableFrom(impClass)) {
1475                                                                                        throw new IllegalActionException(actor,
1476                                                                                                        "Execution class " +
1477                                                                                                                        impClassName + " must be a subclass of " +
1478                                                                                                                        Mapper.class.getName());
1479                                                                        }
1480                                                                        _addDirectorParamsToJob(conf);
1481                                                                        ChainReducer.addMapper(job.getJob(),
1482                                                                                        impClass,
1483                                                                                        StubUtilities.convertToHadoopType(typeMap.get("inKey")), 
1484                                                                                        StubUtilities.convertToHadoopType(typeMap.get("inValue")),
1485                                                                                        StubUtilities.convertToHadoopType(typeMap.get("outKey")),
1486                                                                                        StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf);
1487                                                                                        
1488                                                                }  else {
1489                                                                        conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1490                                            if(!_sameJVM) {
1491                                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1492                                            }
1493                                                                        _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration());
1494                                                                        ChainReducer.addMapper(job.getJob(),
1495                                                                                        MapReduce4Kepler.Mapper4Kepler.class,
1496                                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")), 
1497                                                                                        StubUtilities.convertToHadoopType(typeMap.get("value")),
1498                                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1499                                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf);
1500                                                                }
1501                                                        }
1502                                                } 
1503                                                
1504//                                              else { // actor List is empty. add an identical mapper based on reducer input key value.
1505//                                                      ChainMapper.addMapper(job.getJob(),
1506//                                                                      Mapper.class,
1507//                                                                      Utilities.convertToHadoopType(typeMap.get("key")), 
1508//                                                                      Utilities.convertToHadoopType(typeMap.get("values")),
1509//                                                                      Utilities.convertToHadoopType(typeMap.get("key")), 
1510//                                                                      Utilities.convertToHadoopType(typeMap.get("values")), conf);                                                    
1511//                                              }
1512                                                //if tmpDir != null, it means this reducer is followed by a map.
1513                                                //so we need to set its output info.
1514                                                //TODO: double check whether this condition works with Match/Cross/CoGroup.
1515                                                if (tmpDir != null) {
1516                                                        job.getJob().setOutputFormatClass(SequenceFileOutputFormat.class);
1517                                                        FileOutputFormat.setOutputPath(job.getJob(), new Path(tmpDir));
1518                                                }
1519                                        return _getHadoopJobs(jobList, job,
1520                                                        _getUpStreamActorList(actor).get(0), null, true, hasSpecialMapper, null, false);
1521                                }
1522
1523                        } else if (actor instanceof DualInputPatternActor) {
1524                                job.getJob().setJobName(actor.getName());
1525                                DDPPatternActorUtil actorUtil = null;
1526                                try {
1527                                        actorUtil = _cloneAndExport((DualInputPatternActor)actor);
1528                                } catch (ClassNotFoundException e) {
1529                                        //do nothing, because the actorUtil.getImpClass() will be null.
1530                                }
1531                                if (actorUtil != null && actorUtil.getImpClassName() != null) { //Hadoop doesn't support Match/Cross/CoGroup classes.
1532                                        throw new IllegalActionException(actor,
1533                                                        "You cannot set executionClass parameter for HadoopDirector because Hadoop doesn't support it for Match/Cross/CoGroup actor." +
1534                                                        "You can only use sub-workflows for these actors");
1535                                }
1536                                // already have reducer in this job
1537                                if (hasReducer) {
1538//                                      DDPPatternActorUtil actorUtil = null;
1539//                                      try {
1540//                                              actorUtil = _cloneAndExport((DualInputPatternActor)actor);
1541//                                      } catch (ClassNotFoundException e) {
1542//                                              throw new IllegalActionException(actor, e,
1543//                                                              "Execution class " +
1544//                                                                              actor.getExecutionClassName() + " must be a subclass of " +
1545//                                                                              Map.class.getName());
1546//                                      }
1547
1548                                        
1549                                        if (actorList != null) {
1550                                                for (int i = actorList.size()-1; i >= 0; i--) {
1551                                                        actorUtil = actorList.get(i);
1552                                                        conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1553                            if(!_sameJVM) {
1554                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1555                            }
1556                                                        _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration());
1557                                                        java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1558                                                        ChainMapper.addMapper(job.getJob(),
1559                                                                        MapReduce4Kepler.Mapper4Kepler.class,
1560                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")), 
1561                                                                        StubUtilities.convertToHadoopType(typeMap.get("value")),
1562                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1563                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf);
1564                                                        //special input set for the first map.
1565                                                        if (i == actorList.size()-1) {
1566                                                                job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1567                                                                tmpDir = _getARadomHDFSDir(_tmpDir, false);
1568                                                                FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1569                                                        }
1570                                                }
1571                                        } else { // actor List is empty. add an identical mapper based on upstream reducer output key value.
1572                                                java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1573                                                Class keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key");
1574                                                Class valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value");
1575                                                ChainMapper.addMapper(job.getJob(),
1576                                                        Mapper.class,
1577                                                        keyClass, valueClass,
1578                                                        keyClass, valueClass, conf);                                            
1579                                                //set input for the job.
1580                                                job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1581                                                tmpDir = _getARadomHDFSDir(_tmpDir, false);
1582                                                FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1583                                        }
1584                                        Job newJob = new Job(this.conf);
1585                                        _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration());
1586                                        newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1587                                        ControlledJob controlJob = new ControlledJob(newJob, null);
1588                                        job.addDependingJob(controlJob);
1589                                        jobList.add(controlJob);
1590                                        return _getHadoopJobs(jobList, controlJob, actor, null,
1591                                                        false, false, tmpDir, false);
1592
1593                                } else { // no reducer in this job
1594                                                String tmpCacheDir = null;
1595                                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1596                        if(!_sameJVM) {
1597                            conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1598                        }
1599                                                // FIXME: update classes
1600                                                java.util.Map<String, Type> typeMap = actorUtil.getTypeMap();
1601                                                if (actor instanceof Match) {
1602                                                        //FIXME: it works only if the value1 and value2 has the same datatype.
1603                                                        ChainReducer.setReducer(job.getJob(), Reducer4Match.class,
1604                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")), StubUtilities.convertToTaggedType(typeMap.get("value1")),
1605                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1606                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"),
1607                                                                        conf);
1608                                                        //FIXME: it works only if the value1 and value2 has the same datatype
1609                                                        ChainMapper.addMapper(job.getJob(), Mapper4TagValue.class,
1610                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")),
1611                                                                        //Utilities.convertToTaggedType(typeMap.get("value1")),
1612                                                                        StubUtilities.convertToHadoopType(typeMap.get("value1")),
1613                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")),
1614                                                                        StubUtilities.convertToTaggedType(typeMap.get("value1")), conf);
1615                                                        job.getJob().setCombinerClass(Combiner4TagValue.class);
1616                                                } else if (actor instanceof CoGroup) { //actor instanceof CoGroup
1617                                                        //FIXME: it works only if the values1 and values2 has the same datatype.
1618                                                        ChainReducer.setReducer(job.getJob(), Reducer4CoGroup.class,
1619                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")), StubUtilities.convertToTaggedType(typeMap.get("values1")),
1620                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1621                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"),
1622                                                                        conf);
1623                                                        //FIXME: it works only if the values1 and values2 has the same datatype. 
1624                                                        ChainMapper.addMapper(job.getJob(), Mapper4TagValue.class,
1625                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")),
1626                                                                        //Utilities.convertToTaggedType(typeMap.get("values1")),
1627                                                                        StubUtilities.convertToHadoopType(typeMap.get("values1")),
1628                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")),
1629                                                                        StubUtilities.convertToTaggedType(typeMap.get("values1")), conf);
1630                                                        job.getJob().setCombinerClass(Combiner4TagValue.class); 
1631                                                } else if (actor instanceof Cross) { //actor instanceof Cross
1632                                                        //the two mappers for Cross have to be in two Hadoop jobs, otherwise the outputs of the first map will be sent to input of the second map.
1633                                                        tmpCacheDir = _getARadomHDFSDir(_tmpDir, false);
1634                                                        _log.debug(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH + ":" + tmpCacheDir);
1635                                                        conf.set(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH, tmpCacheDir);
1636//                                                      final Class keyClass = Utilities.convertToHadoopType(typeMap.get("key"));
1637//                                                      final Class valueClass = Utilities.convertToHadoopType(typeMap.get("value"));
1638                                                        ChainMapper.addMapper(job.getJob(), Mapper4Cross.class,
1639                                                                        StubUtilities.convertToHadoopType(typeMap.get("key")),
1640                                                                        StubUtilities.convertToHadoopType(typeMap.get("value")),
1641                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"),
1642                                                                        StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf);
1643
1644                                                }
1645                                                if (actorList != null) {
1646                                                        for (int i = actorList.size()-1; i >= 0; i--) {
1647                                                                actorUtil = actorList.get(i);
1648                                                                conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName());
1649                                    if(!_sameJVM) {
1650                                        conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml());
1651                                    }
1652                                                                java.util.Map<String, Type> typeMap2 = actorUtil.getTypeMap();
1653                                                                ChainMapper.addMapper(job.getJob(),
1654                                                                                MapReduce4Kepler.Mapper4Kepler.class,
1655                                                                                StubUtilities.convertToHadoopType(typeMap2.get("key")), 
1656                                                                                StubUtilities.convertToHadoopType(typeMap2.get("value")),
1657                                                                                StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap2.get("keysvalues"), "key"),
1658                                                                                StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap2.get("keysvalues"), "value"), conf);
1659                                                        }
1660                                                }
1661                                                if (actor instanceof Cross) {
1662//                                                      System.out.println("Working on actor: " + actor.getFullName());
1663                                                        //set reducer of the job based on the output data type of the actor.
1664                                                        //TODO: is this identical reducer really necessary? if we do not have this reducer, the output could be multiple part-m-*** files, reducer will merge them into one.
1665                                                        final Class keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key");
1666                                                        final Class valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value");
1667                                                        ChainReducer.setReducer(job.getJob(),
1668                                                                        Reducer.class,
1669                                                                        keyClass, valueClass,
1670                                                                        keyClass, valueClass, conf);
1671//                                                      hasReducer = true;
1672                                                }
1673                                                setReduceNumber(actor, job.getJob());
1674                                                if (tmpDir != null) {
1675                                                        job.getJob().setOutputFormatClass(SequenceFileOutputFormat.class);
1676                                                        FileOutputFormat.setOutputPath(job.getJob(), new Path(tmpDir));
1677                                                }
1678                                                if (actor instanceof Match || actor instanceof CoGroup) {
1679                                                        if (!(_getUpStreamActorList(actor).get(0) instanceof DDPDataSource))  { 
1680                                                                //there are non DDPDataSource as upstream actors, so a new job should be created.
1681                                                                Job newJob = new Job(this.conf);
1682                                                                newJob.setJobName("taggingJob");
1683                                                                _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration());
1684                                                                newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1685        
1686                                                                job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1687                                                                tmpDir = _getARadomHDFSDir(_tmpDir, false);
1688                                                                FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1689                                                                
1690                                                                //set tag for the actor connected with 'in' port.
1691                                                                job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_TAG, tmpDir);
1692                                                                
1693                                                                newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1694                                                                FileOutputFormat.setOutputPath(newJob, new Path(tmpDir));
1695                                                                
1696                                                                ControlledJob controlJob = new ControlledJob(newJob, null);
1697                                                                job.addDependingJob(controlJob);
1698                                                                jobList.add(controlJob);
1699                                                                jobList = _getHadoopJobs(jobList, controlJob,
1700                                                                                _getUpStreamActorList(actor).get(0), null, false, true, tmpDir, false);
1701                                                        } else {
1702                                                                jobList = _getHadoopJobs(jobList, job,
1703                                                                                _getUpStreamActorList(actor).get(0), null, false, true, null, true);
1704                                                        }
1705                                                        
1706                                                        if (!(_getUpStreamActorList(actor).get(1) instanceof DDPDataSource))  {
1707                                                                //there are non DDPDataSource as upstream actors, so a new job should be created.
1708                                                                Job newJob = new Job(this.conf);
1709                                                                _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration());
1710                                                                newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1711                                                                job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1712                                                                tmpDir = _getARadomHDFSDir(_tmpDir, false);
1713                                                                FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1714                                                                newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1715                                                                FileOutputFormat.setOutputPath(newJob, new Path(tmpDir));
1716                                                                ControlledJob controlJob = new ControlledJob(newJob, null);
1717                                                                job.addDependingJob(controlJob);
1718                                                                jobList.add(controlJob);
1719                                                                return _getHadoopJobs(jobList, controlJob,
1720                                                                                _getUpStreamActorList(actor).get(1), null, false, true, tmpDir, false);
1721                                                        } else {
1722                                                                return _getHadoopJobs(jobList, job,
1723                                                                                _getUpStreamActorList(actor).get(1), null, false, true, null, false);
1724                                                        }
1725                                                        
1726                                                } else { //actor is cross
1727                                                        // check 
1728                                                        job.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1729                                                        tmpDir = _getARadomHDFSDir(_tmpDir, false);
1730                                                        FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1731                                                        //create new job for the actor whose output is connected the 'in' port of this cross actor.
1732                                                        //and connect new job's output to the input of the job for the cross actor.
1733                                                        Job newJob = new Job(this.conf);
1734                                                        newJob.setJobName(actor.getName() + "_in");
1735                                                        _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration());
1736                                                        newJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1737//                                                      final String tmpOutputDir = _getARadomHDFSDir(_tmpDir, false);
1738                                                        FileOutputFormat.setOutputPath(newJob, new Path(tmpDir));
1739                                                        
1740                                                        ControlledJob controlJob = new ControlledJob(newJob, null);
1741                                                        job.addDependingJob(controlJob);
1742                                                        jobList.add(controlJob);
1743                                                        
1744                                                        
1745//                                                      if (!(_getUpStreamActorList(actor).get(0) instanceof DDPDataSource))  { 
1746//                                                              newJob.setInputFormatClass(SequenceFileInputFormat.class);
1747//                                                              tmpDir = _getARadomHDFSDir(_tmpDir, false);
1748//                                                              FileInputFormat.addInputPath(newJob, new Path(tmpDir));
1749//                                                              jobList = _getHadoopJobs(jobList, controlJob,
1750//                                                                              _getUpStreamActorList(actor).get(0), null, hasReducer, true, tmpDir);
1751//                                                      } else 
1752                                                                jobList = _getHadoopJobs(jobList, controlJob,
1753                                                                                _getUpStreamActorList(actor).get(0), null, false, hasSpecialMapper, null, false);
1754                                                        
1755                                                        //_getUpStreamActorList(actor).get(1) is connected with in1 port, so this mapper's datatype should be based on key2 and value2
1756                                                        //the output of _getUpStreamActorList(actor).get(1) will be put to hadoop HDFS at one input for cross sub-workflow.
1757                                                        final Class     keyClass = StubUtilities.convertToHadoopType(typeMap.get("key2"));
1758                                                        final Class valueClass = StubUtilities.convertToHadoopType(typeMap.get("value2"));
1759                                                        conf.set(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH, tmpCacheDir);
1760                                                        conf.set(Utilities.CONFIGURATION_KEPLER_JOB_KEY_CLASS, keyClass.getName());
1761                                                        conf.set(Utilities.CONFIGURATION_KEPLER_JOB_VALUE_CLASS, valueClass.getName());
1762                                                        
1763                                                        Job dataTransJob = new Job(this.conf);
1764                                                        dataTransJob.setJobName("dataTransform");
1765                                                        _addActorParamsToJob((DualInputPatternActor)actor, dataTransJob.getConfiguration());
1766                                                        //the output of this job is not really useful.
1767                                                        dataTransJob.setOutputFormatClass(SequenceFileOutputFormat.class);
1768                                                        final String tmpOutDir4DataTrans = _getARadomHDFSDir(_tmpDir, false);
1769                                                        FileOutputFormat.setOutputPath(dataTransJob, new Path(tmpOutDir4DataTrans));
1770                                                        final String tmpInDir4DataTrans = _getARadomHDFSDir(_tmpDir, false);
1771                                                        FileInputFormat.addInputPath(dataTransJob, new Path(tmpInDir4DataTrans));
1772                                                        dataTransJob.setInputFormatClass(SequenceFileInputFormat.class);
1773                                                        
1774                                                        ControlledJob controlJob4DataTrans = new ControlledJob(dataTransJob, null);
1775                                                        controlJob.addDependingJob(controlJob4DataTrans);
1776                                                        jobList.add(controlJob4DataTrans);
1777
1778                                                        ChainMapper.addMapper(dataTransJob, Mapper4DataTransform.class,
1779                                                                        keyClass, valueClass,
1780                                                                        keyClass, valueClass, conf);
1781                                                        
1782                                                        Job newJob2 = new Job(this.conf);
1783                                                        newJob2.setJobName("prepareDataTransform");
1784                                                        _addActorParamsToJob((DualInputPatternActor)actor, newJob2.getConfiguration());
1785                                                        //the output of this job will be consumed by job dataTransJob.
1786                                                        newJob2.setOutputFormatClass(SequenceFileOutputFormat.class);
1787                                                        FileOutputFormat.setOutputPath(newJob2, new Path(tmpInDir4DataTrans));
1788                                                        
1789                                                        ControlledJob controlJob2 = new ControlledJob(newJob2, null);
1790                                                        controlJob4DataTrans.addDependingJob(controlJob2);
1791                                                        jobList.add(controlJob2);
1792                                                        
1793//                                                      if (!(_getUpStreamActorList(actor).get(1) instanceof DDPDataSource))  {
1794//                                                              controlJob2.getJob().setInputFormatClass(SequenceFileInputFormat.class);
1795//                                                              tmpDir = _getARadomHDFSDir(_tmpDir, false);
1796//                                                              FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir));
1797//                                                              return _getHadoopJobs(jobList, controlJob2,
1798//                                                                              _getUpStreamActorList(actor).get(1), null, hasReducer, true, tmpDir);
1799//                                                      }
1800                                                        return _getHadoopJobs(jobList, controlJob2,
1801                                                                        _getUpStreamActorList(actor).get(1), null, false, hasSpecialMapper, null, false);
1802                                                }
1803                                }
1804
1805                        }
1806                }
1807
1808                return null;
1809
1810        }
1811
1812        /** Get the list of upstream actors from actor. */
1813        private List<DDPPatternActor> _getUpStreamActorList(DDPPatternActor actor)
1814                        throws IllegalActionException {
1815
1816                List<DDPPatternActor> upStreamActorList = null;
1817
1818                // see how many inputs are required
1819                int numRequiredInputs = -1;
1820                if (actor instanceof DualInputPatternActor) {
1821                        numRequiredInputs = 2;
1822                } else if (actor instanceof SingleInputPatternActor
1823                                || actor instanceof DDPDataSink) {
1824                        numRequiredInputs = 1;
1825                } else if (actor instanceof DDPDataSource) {
1826                        numRequiredInputs = 0;
1827                } else {
1828                        throw new IllegalActionException(_director,
1829                                        "Unsupported actor for Hadoop Director: "
1830                                                        + actor.getClass());
1831                }
1832
1833                // see if there's at least one input
1834                if (numRequiredInputs > 0) {
1835                        upStreamActorList = new LinkedList<DDPPatternActor>();
1836
1837                        IOPort port = (IOPort) actor.getPort("in");
1838                        // get the connected actor
1839                        List<?> outputPorts = port.sourcePortList();
1840                        if (outputPorts.isEmpty()) {
1841                                throw new IllegalActionException(_director,
1842                                                "DDPPatternActor input port " + port.getName()
1843                                                                + " must be connected.");
1844                        } else if (outputPorts.size() > 1) {
1845                                throw new IllegalActionException(_director,
1846                                                "DDPPatternActor input port " + port.getName()
1847                                                                + " may only be connected to one port.");
1848                        }
1849
1850                        IOPort outputPort1 = (IOPort) outputPorts.get(0);
1851                        NamedObj outputNamedObj = outputPort1.getContainer();
1852
1853                        // FIXME
1854                        if (!(outputNamedObj instanceof DDPPatternActor)) {
1855                                throw new IllegalActionException(_director, "Actor "
1856                                                + actor.getFullName()
1857                                                + " is connected to a non-DDPPatternActor: "
1858                                                + outputNamedObj.getFullName());
1859                        }
1860
1861                        upStreamActorList.add((DDPPatternActor) outputNamedObj);
1862
1863                        if (numRequiredInputs > 1) {
1864                                port = (IOPort) actor.getPort("in2");
1865                                // get the connected actor
1866                                outputPorts = port.sourcePortList();
1867                                if (outputPorts.isEmpty()) {
1868                                        throw new IllegalActionException(_director,
1869                                                        "DDPPatternActor input port " + port.getName()
1870                                                                        + " must be connected.");
1871                                } else if (outputPorts.size() > 1) {
1872                                        throw new IllegalActionException(_director,
1873                                                        "DDPPatternActor input port " + port.getName()
1874                                                                        + " may only be connected to one port.");
1875                                }
1876
1877                                outputPort1 = (IOPort) outputPorts.get(0);
1878                                outputNamedObj = outputPort1.getContainer();
1879
1880                                // FIXME
1881                                if (!(outputNamedObj instanceof DDPPatternActor)) {
1882                                        throw new IllegalActionException(_director, "Actor "
1883                                                        + actor.getFullName()
1884                                                        + " is connected to a non-DDPPatternActor: "
1885                                                        + outputNamedObj.getFullName());
1886                                }
1887
1888                                upStreamActorList.add((DDPPatternActor) outputNamedObj);
1889                        }
1890                }
1891
1892                return upStreamActorList;
1893        }
1894
1895        /**
1896         * Get the Input/Output Format class name for a source or sink.
1897         * 
1898         * @param actor
1899         *            the Pactor
1900         * @param parameterName
1901         *            the name of the parameter containing the class name.
1902         * @param input
1903         *            if true, get a source contract. if false, get a sink contract.
1904         */
1905        /*
1906        private String _getFormatClassName(AtomicPatternActor actor,
1907                        String parameterName, boolean input) throws IllegalActionException {
1908                final Parameter parameter = (Parameter) actor
1909                                .getAttribute(parameterName);
1910                if (parameter == null) {
1911                        throw new IllegalActionException(actor, "Missing parameter "
1912                                        + parameterName + ".");
1913                }
1914
1915                // make sure it's not empty.
1916                final String formatName = ((StringToken) parameter.getToken())
1917                                .stringValue();
1918                if (formatName.trim().isEmpty()) {
1919                        throw new IllegalActionException(actor,
1920                                        "Missing value for parameter " + parameterName + ".");
1921                }
1922
1923                // try to find the format name in the config properties
1924
1925                String formatType;
1926                if (input) {
1927                        formatType = "InputFormats.Format";
1928                } else {
1929                        formatType = "OutputFormats.Format";
1930                }
1931
1932                final List<ConfigurationProperty> formatList = _configProp
1933                                .getProperties(formatType);
1934                if (formatList == null || formatList.isEmpty()) {
1935                        throw new IllegalActionException(_director,
1936                                        "No formats found in configuration.xml for type "
1937                                                        + formatType);
1938                }
1939
1940                java.util.Map<String, String> parametersMap = new HashMap<String, String>();
1941                String className = null;
1942
1943                for (ConfigurationProperty format : formatList) {
1944            if(format.getProperty("Name").getValue().equalsIgnoreCase(formatName) &&
1945                    format.getProperty("ImplementationClass").getProperty("Hadoop") != null){
1946                className = format.getProperty("ImplementationClass").getProperty("Hadoop").getValue();
1947
1948                                // make sure class is specified for hadoop
1949                                if (className == null) {
1950                                        throw new IllegalActionException(actor, "Format "
1951                                                        + formatName + " does not have a class "
1952                                                        + " for Hadoop.");
1953                                }
1954
1955                                ConfigurationProperty parameterProps = format
1956                                                .getProperty("Parameters");
1957                                if (parameterProps != null) {
1958                                        parametersMap = ConfigurationUtilities
1959                                                        .getPairsMap(parameterProps);
1960                                }
1961
1962                                break;
1963                        }
1964                }
1965
1966                if (className != null)
1967                        return className;
1968                //if the classname is null, it could be because the format name is the class name. return formatname.
1969                else 
1970                        return formatName;
1971
1972        }
1973         */
1974        
1975        /**
1976         * Get the value of a parameter for an actor.
1977         * 
1978         * @param actor
1979         *            the Pactor
1980         * @param parameterName
1981         *            the name of the parameter containing the class name.
1982         */
1983        private String _getParaValue(AtomicPatternActor actor, String parameterName)
1984                        throws IllegalActionException {
1985                final Parameter parameter = (Parameter) actor
1986                                .getAttribute(parameterName);
1987                if (parameter == null) {
1988                        throw new IllegalActionException(actor, "Missing parameter "
1989                                        + parameterName + ".");
1990                }
1991
1992                // make sure it's not empty.
1993                final String value = ((StringToken) parameter.getToken()).stringValue();
1994                if (value.trim().isEmpty()) {
1995                        throw new IllegalActionException(actor,
1996                                        "Missing value for parameter " + parameterName + ".");
1997                }
1998
1999                return value;
2000
2001        }
2002
2003        private void _exportSubWorkflow(SingleInputPatternActor actor)
2004                        throws IllegalActionException, IOException {
2005                if (_jobDir == null) {
2006                        _createJobDirectory();
2007                }
2008                final String modelPath = _jobDir + actor.getName() + ".xml";
2009                FileWriter writer = null;
2010                try {
2011                        writer = new FileWriter(modelPath);
2012                        actor.exportMoML(writer);
2013                } catch (IOException e) {
2014                        throw new IllegalActionException(_director, e, "Error writing model to "
2015                                        + modelPath);
2016                } finally {
2017                        if (writer != null)
2018                                writer.close();
2019                }
2020        }
2021        
2022        // get a random local dir to store output during Map-Reduce computing. 
2023        private String _getARadomLocalDir() throws IllegalActionException {
2024                String tmpDir = System.getProperty("java.io.tmpdir");
2025                Random ran = new Random();
2026                Long ranLong = null;
2027                String ranStr = null;
2028                boolean res = false;
2029                int index = 0;
2030                do {
2031                        ranLong = ran.nextLong();
2032                        if (ranLong < 0)
2033                                ranLong = 0 - ranLong;
2034                        ranStr = tmpDir + System.getProperty("file.separator") + ranLong.toString();
2035                        _log.info("the random directory to be created in _getARadomLocalDir(): " + ranStr);
2036                        res = new File(ranStr).mkdir();
2037                        index++;
2038                } while (res != true && index < 10);
2039                if (index == 10) {
2040                        throw new IllegalActionException(
2041                                        _director,
2042                                        "the random directory creation in folder " + tmpDir + " failed for 10 times, please check local file system.");
2043                }
2044                return ranStr;
2045        }
2046        
2047        // get a random HDFS dir to store output during Map-Reduce computing. The dir will not be actually created since it will be done in Hadoop job.
2048        private String _getARadomHDFSDir(String parentPath, boolean mkDir) throws IllegalActionException, IOException {
2049                _log.info("enter into _getARadomHDFSDir()");
2050                FsShell shell = new FsShell(conf);
2051                Random ran = new Random();
2052                Long ranLong = null;
2053                String ranStr = null;
2054                String[] arg4Mkdir = { "-ls", "" };
2055                int stopSignal = 1;
2056                if (mkDir) {
2057                        arg4Mkdir[0] = "-mkdir";
2058                        stopSignal = 0;
2059                }
2060                // String[] arg4Mkdir = null;
2061                int res = -1;
2062                int index = 0;
2063                try {
2064                        do {
2065                                ranLong = ran.nextLong();
2066                                if (ranLong < 0)
2067                                        ranLong = 0 - ranLong;
2068                                ranStr = ranLong.toString();
2069                                if (parentPath != null)
2070                                        ranStr = parentPath + "/" + ranStr;
2071                                else // added for hadoop 2.0, 
2072                                        ranStr = "/" + ranStr;
2073                                arg4Mkdir[1] = ranStr;
2074                                _log.info("the args in _getARadomHDFSDir(): " + arg4Mkdir[0] + ", "
2075                                                + arg4Mkdir[1]);
2076                                res = shell.run(arg4Mkdir);
2077                                index++;
2078                                // res = ToolRunner.run(conf, shell, arg4Mkdir);
2079                                final String message = Arrays.toString(arg4Mkdir).replace(",", "");
2080                                _log.info("the exit code of 'hadoop fs " + message.substring(1, message.length()-1) + " ' is: " + res);
2081                        } while (res != stopSignal && index < 10);
2082                        if (index == 10) {
2083                                final String message = Arrays.toString(arg4Mkdir).replace(",", "");
2084                                throw new IllegalActionException(
2085                                                _director,
2086                                                "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed for 10 times, please check whether hadoop is correctly setup.");
2087                        }
2088                        
2089                        if(mkDir) { //test staging in files
2090                                _log.info("staging tmp file to check whether DataNode is working ");
2091                                _stageIn(ranStr, _configDirStr + File.separator + "slaves");
2092                        }
2093                                
2094                        return ranStr;
2095                } catch (Exception e) {
2096                        
2097                        e.printStackTrace();
2098                        final String message = Arrays.toString(arg4Mkdir).replace(",", "");
2099                        throw new IllegalActionException(
2100                                        _director, e,
2101                                        "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed, please check whether hadoop is correctly setup.");
2102                } finally {
2103                        shell.close();
2104                        _log.debug("enter into finally of _getARadomHDFSDir() with ranStr:" + ranStr);
2105                }
2106                        
2107        }
2108        
2109        private void _verifyParaSetting(String path, DDPPatternActor actor) throws IllegalActionException, IOException {
2110                if (path.substring(0, 4).equalsIgnoreCase("hdfs") && _autoHDFSStage)
2111                        throw new IllegalActionException(
2112                                        _director,
2113                                        "Since the parameter autoHDFSStage of director " + _director.getName() + " is true, actor " + actor.getFullName() + "'s path cannot be HDFS url.");
2114
2115        }
2116        
2117        
2118        private void _removeOutputDir(String outputPathStr, DDPPatternActor actor) throws IllegalActionException, IOException {
2119                boolean deleteDecision = true;
2120                if (!outputPathStr.startsWith("hdfs")) {
2121                        String[] filesInOutputPath = new File(outputPathStr).list();
2122                        for(int i = 0; i < filesInOutputPath.length; i++){
2123                                //make the behavior the same with Spark
2124                                if (!filesInOutputPath[i].startsWith("part-") && !filesInOutputPath[i].startsWith(".part-")
2125                                                && !filesInOutputPath[i].equalsIgnoreCase("_SUCCESS") 
2126                                                && !filesInOutputPath[i].equalsIgnoreCase("._SUCCESS.crc")) {
2127                                        deleteDecision = false;
2128                                        break;
2129                                }
2130                                        
2131                        }
2132                        if(deleteDecision)
2133                                FileUtils.deleteDirectory(new File(outputPathStr)); //TODO: make this delete more cautious and consistent with HDFS. 
2134                        else
2135                                throw new IllegalActionException(
2136                                                _director,
2137                                                "the path " + outputPathStr + " in actor "
2138                                                                + actor.getName() + " cannot be deleted because it has folders or other files. Please check file system." );
2139                } else {
2140                        Path outPath = new Path(outputPathStr);
2141                if (fs == null)
2142                        fs = FileSystem.get(conf);
2143                        if (fs.exists(outPath))
2144                                fs.delete(outPath, true);
2145                }
2146                        
2147        }
2148        
2149        private void _removeOutput(String strPath, boolean tmpDir) throws IllegalActionException, IOException {
2150                
2151                if (fs == null)
2152                        fs = FileSystem.get(conf);
2153                Path hadoopPath = new Path(strPath);
2154                
2155                //if the output file doesn't exist, return directly.
2156                if (!fs.exists(hadoopPath))
2157                        return;
2158                
2159                boolean exitCode = false;
2160        
2161                //if (fs.isFile(hadoopPath))
2162                System.out.println("tmpDir:" + tmpDir);
2163                exitCode = fs.delete(hadoopPath, tmpDir);
2164                
2165                if (!exitCode)
2166                        throw new IllegalActionException(
2167                                        _director,
2168                                        "Failed to remove output " + strPath  + ".\n Please check console for detailed information and whether hadoop is correctly setup.");
2169
2170                /*
2171                fs.listFiles(new Path(hdfsPath), false);
2172                
2173                //remove intermediate data in HDFS
2174                FsShell shell = new FsShell(conf);
2175                int res = -1;
2176                String[] argv1 = { "-rm", "-r", hdfsPathStr};
2177                try {                           
2178                                res = shell.run(argv1);
2179                                if (res == -1) {
2180                                        final String message = Arrays.toString(argv1).replace(",", "");
2181                                        throw new IllegalActionException(
2182                                                        _director,
2183                                                        "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed, please check whether hadoop is correctly setup.");
2184                                }
2185
2186                } catch (Exception e) {
2187                        
2188                        e.printStackTrace();
2189                        final String message = Arrays.toString(argv1).replace(",", "");
2190                        throw new IllegalActionException(
2191                                        _director, e,
2192                                        "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed, please check whether hadoop is correctly setup.");
2193
2194                } finally {
2195                                shell.close();
2196                } */            
2197        }
2198
2199        //set reduce number based on actor parallel degree. It is more useful for downstream actors' parallelization.
2200        private void setReduceNumber(DDPPatternActor actor, Job job) throws IllegalActionException{
2201        int numInstances = actor.getDegreeOfParallelism();
2202        if(numInstances <= 0) {
2203            numInstances = _degreeOfParallelism;
2204        }
2205        else if(_sameJVM && numInstances > 1) {
2206            System.err.println("WARNING: degree of parallelism for " +
2207                    actor.getName() + " is " + numInstances +
2208                    ", but Hadoop only uses 1 thread in sameJVM mode.");
2209        }
2210
2211                job.setNumReduceTasks(numInstances);
2212        }
2213        
2214        private void _addActorParamsToJob(DDPPatternActor actor, Configuration conf)
2215                        throws IllegalActionException { 
2216            // add any actor parameters to the job configuration
2217            final java.util.Map<String,String> actorParameters = actor.getParameters();
2218            final java.util.Map<String,String> paraNames = actor.getParaImplNames(_engineName);
2219            for(java.util.Map.Entry<String, String> entry : actorParameters.entrySet()) {
2220                String keplerParaName = entry.getKey();
2221                if (paraNames.get(keplerParaName) != null)
2222                        conf.set(paraNames.get(keplerParaName), entry.getValue());
2223                else 
2224                        conf.set(Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::" + keplerParaName, entry.getValue());
2225            }
2226
2227            //set data parallel degree
2228        int numInstances = actor.getDegreeOfParallelism();
2229        if(numInstances <= 0) {
2230            numInstances = _degreeOfParallelism;
2231        }
2232        else if(_sameJVM && numInstances > 1) {
2233            System.err.println("WARNING: degree of parallelism for " +
2234                    actor.getName() + " is " + numInstances +
2235                    ", but Hadoop only uses 1 thread in sameJVM mode.");
2236        }
2237
2238            conf.set(Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL, String.valueOf(numInstances));
2239            
2240            // the following parameters are only useful for SingleInputPatternActor instances.
2241//          if (actor instanceof SingleInputPatternActor) {
2242            //set print execution info parameter
2243            conf.setBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, actor.getPrintExeInfo());
2244        //set display redirect path
2245        String directDir = actor.getDisplayRedirectDir();
2246        boolean dirFromActor = true;
2247        if(directDir.isEmpty()) {
2248                dirFromActor = false;
2249                directDir = _displayRedirectDir;
2250        }
2251        if(!directDir.isEmpty()) {
2252                final File file = new File (directDir);
2253                if (!file.exists()) {
2254                        boolean result = file.mkdirs();
2255                        if (result==false)
2256                                throw new IllegalActionException(dirFromActor ? actor : _director, "Cannot create directory based on Parameter displayRedirectDir's value '" + _displayRedirectDir + "'. Please check your file system.");
2257                }
2258                if (!file.isDirectory() || !file.canWrite())
2259                        throw new IllegalActionException(dirFromActor ? actor : _director, "Parameter displayRedirectDir's value '" + _displayRedirectDir + "' must be a directory and writable.");
2260                conf.set(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, directDir);
2261        }
2262//          }
2263            
2264        // set the location of the kepler installation directory.
2265        // NOTE: this is done so that the stub can initialize kepler and set
2266        // the java properties for each module's workflow directory, e.g.:
2267        // property("hadoop.workflowdir")
2268        // if the modules directory does not exist on the stub, e.g., the file
2269        // system is not shared, then initialization is not done and the stub 
2270        // workflow cannot use the module workflow directory properties.
2271            conf.set(Utilities.CONFIGURATION_KEPLER_MODULES_DIR,
2272                ProjectLocator.getKeplerModulesDir().getAbsolutePath());
2273        }
2274        
2275        /*
2276        private URLClassLoader _loadActorJars(DDPPatternActor actor)
2277                        throws IllegalActionException, MalformedURLException {  
2278            final String jarsStr = actor.getJars();
2279            if(!jarsStr.isEmpty()) {
2280                ArrayList<URL> jarList = new ArrayList<URL>();
2281                final String[] jars = jarsStr.split(",");
2282                for(String jar : jars) {
2283                    final File jarFile = new File(jar);
2284                    if(!jarFile.exists() || !jarFile.canRead()) {
2285                        throw new IllegalActionException(actor,
2286                                "Jar does not exist or cannot be read: " + jarFile.getAbsolutePath());
2287                    }
2288                    System.out.println("Adding jar for " + actor.getFullName() + ": " +
2289                            jarFile.getAbsolutePath());
2290                    jarList.add(new File(jar).toURI().toURL());
2291                }
2292                        URL[] jarArray = jarList.toArray(new URL[jarList.size()]);
2293                        if (jarArray != null && jarArray.length > 0) {
2294                                return new URLClassLoader(jarArray, Thread.currentThread()
2295                                                .getContextClassLoader());
2296                        }  
2297            }
2298            //by default, return the class loader of this director;
2299            return (URLClassLoader)_classLoader;
2300        } */
2301        
2302        
2303        private void _addDirectorParamsToJob(Configuration conf)
2304                        throws IllegalActionException {
2305            for(java.util.Map.Entry<String, String> entry : _jobArgumentsMap.entrySet()) {
2306                conf.set(entry.getKey(), entry.getValue());
2307            }
2308        }
2309
2310        // set PartitonerClass of Map actor to Hadoop job.
2311        private void _setPartitonerClass(Job job, Actor actor) throws ClassNotFoundException, IllegalActionException{
2312        final String partitionerClassName = ((Map)actor).partitionerClass.stringValue();
2313        if (!partitionerClassName.isEmpty()) {
2314                final Class partitionerClass = _classLoader.loadClass(partitionerClassName);
2315                        if (!Partitioner.class.isAssignableFrom(partitionerClass)) {
2316                                throw new IllegalActionException(actor,
2317                                                "Partitioner class " +
2318                                                                partitionerClass + " must be a subclass of " +
2319                                                                Partitioner.class.getName());
2320                        }
2321                job.setPartitionerClass(partitionerClass);
2322        }
2323        }
2324        
2325        // set jars of Kepler to Hadoop job.
2326        private void _setJars(Configuration conf, URL[] jarArray) throws IllegalActionException {
2327                if (jarArray != null && jarArray.length > 0) {
2328                    StringBuffer jarStringBuffer = new StringBuffer();
2329                    for (URL jarURL : jarArray) {
2330                            jarStringBuffer.append(jarURL);
2331                            jarStringBuffer.append(",");
2332                    }
2333                    String jarString = jarStringBuffer.toString().substring(0,
2334                                jarStringBuffer.length() - 1);
2335
2336                    _log.info("kepler jars to be added to hadoop: " + jarString);
2337                    //tmpjars is the parameter in Hadoop for third-party jars.
2338                    conf.set("tmpjars", jarString);
2339
2340                        conf.setClassLoader(new URLClassLoader(jarArray, conf
2341                                        .getClassLoader()));
2342                }
2343        }
2344        
2345        // stage data in from local file system to HDFS
2346        private String _stageIn(String parentPath, String localInput) throws IllegalActionException, IOException {
2347                _log.debug("enter into stageIn. localInput is:" + localInput);
2348                File localInputFile = new File(localInput);
2349                String localInputStr = localInputFile.getName();
2350                FsShell shell = new FsShell(conf);
2351                Random ran = new Random();
2352                Long ranLong = null;
2353                String ranStr = null;
2354                //int ranInt;
2355                // String[] arg4Mkdir = null;
2356                int res = -1;
2357                int index = 0;
2358                String[] arg4Put = { "-put", localInput, ""};
2359                String[] arg4Mkdir = { "-mkdir", "" };
2360                try {
2361                        do {
2362                                ranLong = ran.nextLong();
2363                                if (ranLong < 0)
2364                                        ranLong = 0 - ranLong;
2365                                ranStr = String.valueOf(ranLong);
2366                                if (parentPath != null)
2367                                        ranStr = parentPath + "/" + ranStr;
2368                                else // added for hadoop 2.0, 
2369                                        ranStr = "/" + ranStr;
2370                                arg4Mkdir[1] = ranStr;
2371                                _log.debug("the arg4Mkdir: " + arg4Mkdir[0] + ", "
2372                                                + arg4Mkdir[1]);
2373                                res = shell.run(arg4Mkdir);
2374                                // res = ToolRunner.run(conf, shell, arg4Mkdir);
2375                                _log.debug("the exit code of 'hadoop fs -mkdir " + ranStr + " ' is: "
2376                                                + res);
2377                                index++;
2378                        } while (res != 0 && index < 10);
2379                        if (index == 10)
2380                                throw new IllegalActionException(
2381                                                _director,
2382                                                "command 'hadoop fs -mkdir' failed for 10 times, please check whether hadoop is correctly setup.");
2383                        arg4Put[2] = ranStr;
2384                        _log.debug("the arg4Put: " + arg4Put[0] + ", " + arg4Put[1] + ", "
2385                                        + arg4Put[2]);
2386                        // res = ToolRunner.run(shell, arg4Put);
2387                        res = shell.run(arg4Put);
2388                        final String message = Arrays.toString(arg4Put).replace(",", "");
2389                        _log.debug("the exit code of 'hadoop fs " + message.substring(1, message.length()-1) + " ' is: " + res);
2390                        if (res == -1) {
2391                                throw new IllegalActionException(
2392                                                _director,
2393                                                "command 'hadoop fs "
2394                                                                + message.substring(1, message.length()-1)
2395                                                                + "' was not successully executed, please check whether hadoop is correctly setup.");
2396                        }
2397                        return ranStr + "/" + localInputStr;
2398                } catch (Exception e) {
2399                        final String message = Arrays.toString(arg4Put).replace(",", "");
2400                        throw new IllegalActionException(
2401                                        _director,
2402                                        "command 'hadoop fs "
2403                                                        + message.substring(1, message.length()-1)
2404                                                        + "' was not successully executed, please check whether hadoop is correctly setup.");
2405                } finally {
2406                        shell.close();
2407                        _log.debug("enter into finally with ranStr:" + ranStr);                 
2408                }
2409        }
2410        
2411        // stage data out from HDFS to local file system
2412        private int _stageOut(String hdfsOutput, String localOuput)
2413                        throws IllegalActionException, IOException {
2414                _log.debug("enter into stageOut");
2415        if (fs == null)
2416                        fs = FileSystem.get(conf);
2417        Path hdfsOutputPath = new Path(hdfsOutput);
2418                if (fs.exists(hdfsOutputPath)) {
2419                        fs.copyToLocalFile(true, hdfsOutputPath, new Path(localOuput));
2420                }
2421                return 0; //output file doesn't exsit, return 0.
2422
2423        }
2424        
2425        /** Transfer token outputs from HDFS to DDPDataSink actors. */
2426        private void _transferTokenOutputsFromHDFS() throws IllegalActionException {
2427            
2428            // transfer the output for each sink actor using TokenOutputFormat
2429            for(java.util.Map.Entry<Path, DDPDataSink> entry : _tokenOutputFileMap.entrySet()) {
2430            
2431                // determine the key and value types
2432                
2433            final DDPDataSink sinkActor = entry.getValue();
2434            Type inType = sinkActor.in.getType();
2435            if(!(inType instanceof ArrayType)) {
2436                throw new IllegalActionException(_director,
2437                        "Expected array input type for " + sinkActor.getFullName() +
2438                        " ; found: " + inType);
2439            }
2440            Type elementType = ((ArrayType)inType).getElementType();
2441            if(!(elementType instanceof RecordType)) {
2442                throw new IllegalActionException(_director,
2443                        "Expected array of records input type for " + sinkActor.getFullName() +
2444                        " ; found: array of " + elementType);
2445            }
2446            
2447            // TODO check for null
2448            final Type keyType = ((RecordType)elementType).get("key");
2449            // TODO check for null
2450            final Type valueType = ((RecordType)elementType).get("value");
2451
2452            // transfer the file from HDFS to the local file system.
2453            final Path tokenPath = entry.getKey(); 
2454
2455            try {
2456                
2457                File tmpFile = File.createTempFile("tokenOutput", ".txt");
2458                if(!tmpFile.delete()) {
2459                    System.err.println("WARNING: could not delete " + tmpFile);
2460                }
2461                FileUtil.copyMerge(tokenPath.getFileSystem(conf),
2462                        tokenPath,
2463                        FileSystem.getLocal(conf),
2464                        new Path(tmpFile.getAbsolutePath()),
2465                        true,
2466                        conf,
2467                        ""
2468                        );
2469
2470                // parse the file and create a list of record tokens
2471                FileReader fileReader = null;
2472                BufferedReader reader = null;
2473                try {
2474                    fileReader = new FileReader(tmpFile);
2475                    reader = new BufferedReader(fileReader);
2476                    List<Token> tokenList = new ArrayList<Token>();
2477                    String line = null;
2478                    while((line = reader.readLine()) != null) {
2479                        //System.out.println("token output: " + line);
2480                        String[] parts = line.split("\t");
2481                        if(parts.length != 2) {
2482                            throw new IllegalActionException(_director,
2483                                    "Mal-formed output in token output file: " + line);
2484                        }
2485                        
2486                        // create the key and value tokens based on the types used by the
2487                        // sink actor's input port.
2488                        RecordToken token = new RecordToken(new String[] {"key", "value"},
2489                                new Token[] {
2490                                    Utilities.createTokenFromString(parts[0], keyType),
2491                                    Utilities.createTokenFromString(parts[1], valueType)});
2492                        tokenList.add(token);
2493                    }
2494                    DDPDataSink.addTokens(sinkActor.getFullName(), tokenList);
2495                } finally {
2496                    if(fileReader != null) {
2497                        fileReader.close();
2498                    }
2499                    if(reader != null) {
2500                        reader.close();
2501                    }
2502                    if(tmpFile != null) {
2503                        tmpFile.delete();
2504                    }
2505                    // TODO clean up HDFS
2506                }
2507            } catch(IOException e) {
2508                throw new IllegalActionException(_director, e,
2509                        "Error reading token output file in HDFS: " + tokenPath);
2510            }
2511        }       
2512        }
2513        
2514        /**
2515         * If true, the output directory will be deleted first.
2516         */
2517        public Parameter overwriteOutput;
2518        
2519        /**
2520         * If true, the temporary dir created on HDFS during its execution will be removed after workflow execution. 
2521         */
2522        public Parameter removeTmpHDFSDir;
2523        
2524        /**
2525         * If true, input and output in local file system is automatically staged into or out from HDFS. 
2526         */
2527        public Parameter autoHDFSStage; 
2528        
2529        /** Hadoop configuration. */
2530        private Configuration conf;
2531
2532        /** Configuration properties for ddp-common module. */
2533        //private ConfigurationProperty _configProp;
2534        
2535        /**
2536         * If true, the output directory will be deleted first.
2537         */
2538        public boolean _overwriteOutput = false;
2539        
2540        /**
2541         * If true, input and output in local file system is automatically staged into or out from HDFS. 
2542         */
2543        public boolean _autoHDFSStage = false;
2544        
2545        /**
2546         * The temporary directory for each workflow execution.
2547         */
2548        private String _tmpDir;
2549        
2550        /**
2551         * If _stageOutDirMap is not empty, the data in HDFS will be staged out to local file system at the end of execution of the Hadoop director.
2552         * Each element of the map is composed of <tmpDirOnHDFS, localDir>.
2553         */
2554        private java.util.Map<String, String> _stageOutDirMap = null;
2555        
2556        /**
2557         * A list of configuration directories for each hadoop server that is
2558         * started.
2559         */
2560        private final static List<String> _startedConfigDirs = Collections
2561                        .synchronizedList(new LinkedList<String>());
2562
2563        /** Logging. */
2564    private final static Log _log = LogFactory.getLog(HadoopEngine.class);
2565
2566    /** The name of the hadoop engine. */
2567    private final static String HADOOP_ENGINE_NAME = "Hadoop";
2568            
2569    /** The Hadoop job manager. */
2570    private JobControl _jobControl;
2571    
2572    /** The Hadoop file system */
2573    private FileSystem fs = null;
2574    
2575    /** Mapping of token output results in HDFS to their destination DDPDataSink actor. */
2576    private java.util.Map<Path,DDPDataSink> _tokenOutputFileMap = new HashMap<Path,DDPDataSink>();
2577}