001/* A DDP engine than runs models in Hadoop. 002 * 003 * Copyright (c) 2012-2013 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027package org.kepler.hadoop.director; 028 029import java.io.BufferedReader; 030import java.io.File; 031import java.io.FileReader; 032import java.io.FileWriter; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.InputStreamReader; 036import java.io.StringWriter; 037import java.net.InetSocketAddress; 038import java.net.MalformedURLException; 039import java.net.Socket; 040import java.net.URI; 041import java.net.URL; 042import java.net.URLClassLoader; 043import java.util.ArrayList; 044import java.util.Arrays; 045import java.util.Collection; 046import java.util.Collections; 047import java.util.HashMap; 048import java.util.LinkedList; 049import java.util.List; 050import java.util.Random; 051 052import org.apache.commons.io.FileUtils; 053import org.apache.commons.logging.Log; 054import org.apache.commons.logging.LogFactory; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.FileUtil; 058import org.apache.hadoop.fs.FsShell; 059import org.apache.hadoop.fs.Path; 060import org.apache.hadoop.io.RawComparator; 061import org.apache.hadoop.mapreduce.InputFormat; 062import org.apache.hadoop.mapreduce.Job; 063import org.apache.hadoop.mapreduce.Mapper; 064import org.apache.hadoop.mapreduce.OutputFormat; 065import org.apache.hadoop.mapreduce.Partitioner; 066import org.apache.hadoop.mapreduce.Reducer; 067import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; 068import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; 069import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 070import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 071import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 072import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; 073import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; 074import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 075import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 076import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 077import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 078import org.kepler.build.project.ProjectLocator; 079import org.kepler.ddp.Utilities; 080import org.kepler.ddp.actor.pattern.AtomicPatternActor; 081import org.kepler.ddp.actor.pattern.CoGroup; 082import org.kepler.ddp.actor.pattern.Cross; 083import org.kepler.ddp.actor.pattern.DDPDataSink; 084import org.kepler.ddp.actor.pattern.DDPDataSource; 085import org.kepler.ddp.actor.pattern.DDPPatternActor; 086import org.kepler.ddp.actor.pattern.DualInputPatternActor; 087import org.kepler.ddp.actor.pattern.Map; 088import org.kepler.ddp.actor.pattern.Match; 089import org.kepler.ddp.actor.pattern.Reduce; 090import org.kepler.ddp.actor.pattern.SingleInputPatternActor; 091import org.kepler.ddp.director.DDPDirector; 092import org.kepler.ddp.director.DDPEngine; 093import org.kepler.hadoop.io.input.TokenInputFormat; 094import org.kepler.hadoop.io.output.TokenOutputFormat; 095import org.kepler.hadoop.mapreduce.Combiner4TagValue; 096import org.kepler.hadoop.mapreduce.MapReduce4Kepler; 097import org.kepler.hadoop.mapreduce.Mapper4Cross; 098import org.kepler.hadoop.mapreduce.Mapper4DataTransform; 099import org.kepler.hadoop.mapreduce.Mapper4TagValue; 100import org.kepler.hadoop.mapreduce.Reducer4CoGroup; 101import org.kepler.hadoop.mapreduce.Reducer4Match; 102import org.kepler.hadoop.util.DDPPatternActorUtil; 103import org.kepler.hadoop.util.StubUtilities; 104 105import ptolemy.actor.Actor; 106import ptolemy.actor.IOPort; 107import ptolemy.actor.TypeAttribute; 108import ptolemy.actor.TypedIOPort; 109import ptolemy.data.BooleanToken; 110import ptolemy.data.RecordToken; 111import ptolemy.data.StringToken; 112import ptolemy.data.Token; 113import ptolemy.data.expr.Parameter; 114import ptolemy.data.type.ArrayType; 115import ptolemy.data.type.BaseType; 116import ptolemy.data.type.RecordType; 117import ptolemy.data.type.Type; 118import ptolemy.kernel.util.IllegalActionException; 119import ptolemy.kernel.util.InternalErrorException; 120import ptolemy.kernel.util.NameDuplicationException; 121import ptolemy.kernel.util.NamedObj; 122import ptolemy.kernel.util.Workspace; 123 124/** An engine than runs workflows in Hadoop. This engine 125 * converts DDP pattern actors (Map, Reduce, Cross, CoGroup, and 126 * Match) and I/O actors (DDPDataSink and DDPDataSource) into a 127 * Hadoop job and runs it on the server. 128 * <p> 129 * <b>NOTE:</b> Only DDP pattern and I/O actors may be present in 130 * the workflow. Other actors must placed inside the composite 131 * pattern actors or in a different sub-workflow. 132 * </p> 133 * 134 * @author Jianwu Wang 135 * @version $Id: HadoopEngine.java 33628 2015-08-24 22:42:20Z crawl $ 136 * 137 */ 138public class HadoopEngine extends DDPEngine { 139 140 public static final String HDFS_NAME = "fs.default.name"; 141 public static final String FS_NAME = "fs.defaultFS"; 142 public static final String RES_MANAGER_URL = "0.0.0.0:8032"; 143 public static final String CORE_SITE = "core-site.xml"; 144 public static final String MAPRED_SITE = "mapred-site.xml"; 145 public static final String YARN_SITE = "yarn-site.xml"; 146 public static final String MAP_CHILD_ENV = "mapred.map.child.env"; 147 public static final String REDUCE_CHILD_ENV = "mapred.reduce.child.env"; 148 public static final String MR_JOBTRACKER_ADDRESS = "mapreduce.jobtracker.address"; 149 public static final String MR_FRAMEWORK_NAME = "mapreduce.framework.name"; 150 public static final String YARN_RM_ADDRESS = "yarn.resourcemanager.address"; 151 152 public HadoopEngine(DDPDirector director) throws IllegalActionException, 153 NameDuplicationException { 154 super(director); 155 156 // load the configuration file 157 //_configProp = ConfigurationManager.getInstance().getProperty( 158 //ConfigurationManager.getModule("ddp-common")); 159 160 _engineName = HADOOP_ENGINE_NAME; 161 162 } 163 164 /** Clone the HadoopEngine into the specified workspace. 165 * @param workspace The workspace for the cloned object. 166 * @exception CloneNotSupportedException Not thrown in this base class 167 * @return The new HadoopDirector. 168 */ 169 @Override 170 public Object clone(Workspace workspace) throws CloneNotSupportedException { 171 HadoopEngine newObject = (HadoopEngine) super.clone(workspace); 172 newObject._autoHDFSStage = false; 173 newObject._jobControl = null; 174 newObject._overwriteOutput = false; 175 newObject._stageOutDirMap = null; 176 newObject._tmpDir = null; 177 newObject._tokenOutputFileMap = new HashMap<Path,DDPDataSink>(); 178 newObject.conf = null; 179 //newObject._configProp = null; 180 return newObject; 181 } 182 183 @Override 184 protected void _executeDDPJob() throws IllegalActionException { 185 186 conf = new Configuration(); 187 conf.addResource(new Path(_configDirStr + File.separator 188 + CORE_SITE)); 189 conf.addResource(new Path(_configDirStr + File.separator 190 + MAPRED_SITE)); 191 conf.addResource(new Path(_configDirStr + File.separator 192 + YARN_SITE)); 193 //set environment variables for child processes. 194 conf.set(MAP_CHILD_ENV, "PATH="+System.getenv("PATH") + ",LD_LIBRARY_PATH="+System.getenv("LD_LIBRARY_PATH")); 195 conf.set(REDUCE_CHILD_ENV, "PATH="+System.getenv("PATH") + ",LD_LIBRARY_PATH="+System.getenv("LD_LIBRARY_PATH")); 196 //try to solve user.home error for child process. But the following two lines are not the correct solution. 197 //conf.set("mapred.child.java.opts", conf.get("mapred.child.java.opts") + " -Duser.home=" + System.getProperty("user.home")); 198 //conf.set("mapred.map.child.java.opts", conf.get("mapred.map.child.java.opts") + " -Duser.home=" + System.getProperty("user.home")); 199 conf.setBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, _sameJVM); 200 201 if(_sameJVM) { 202 //conf.set("mapred.job.tracker", "local"); 203 conf.set(MR_JOBTRACKER_ADDRESS, "local"); 204 conf.set(MR_FRAMEWORK_NAME, "local"); 205 conf.set(FS_NAME, "file:///"); 206 System.out.println("Running Hadoop server in Kepler JVM."); 207 208 if(_degreeOfParallelism > 1) { 209 System.err.println("WARNING: the default degree of parallelism for " + 210 _director.getName() + " is " + _degreeOfParallelism + 211 ", but Hadoop only uses 1 thread in sameJVM mode."); 212 } 213 214 } else { 215 System.out.println("Checking Hadoop server in distributed mode."); 216 String hdfsURL = conf.get(FS_NAME); 217 _log.info("value of " + FS_NAME + " : " + hdfsURL); 218 if (hdfsURL == null) { 219 hdfsURL = conf.get(HDFS_NAME); 220 if (hdfsURL == null) 221 throw new IllegalActionException(_director, 222 "both 'fs.default.name' and 'fs.defaultFS' of Hadoop properties are null."); 223 } 224 225 String resManagerURL = conf.get(YARN_RM_ADDRESS); 226 if (resManagerURL == null) { 227 //use default value for it 228 resManagerURL = RES_MANAGER_URL; 229 } 230 _log.info("value of " + YARN_RM_ADDRESS + " : " + resManagerURL); 231 232 String[] hdfsInfo = hdfsURL.split(":"); 233 String[] jobInfo = resManagerURL.split(":"); 234 235 final InetSocketAddress hdfsAddress = new InetSocketAddress( 236 hdfsInfo[1].substring(2), new Integer(hdfsInfo[2]).intValue()); 237 238 final InetSocketAddress resManagerAddress = new InetSocketAddress( 239 jobInfo[0], new Integer(jobInfo[1]).intValue()); 240 241 // see if we can connect to Hadoop; start if necessary 242 synchronized (this) { 243 _checkServer(hdfsAddress, resManagerAddress); 244 } 245 } 246 247 _jobDir = null; 248 249 synchronized(_jobLock) { 250 _jobControl = new JobControl(_director.getFullName()); 251 } 252 253 URL[] jarArray; 254 try{ 255 final List<URI> jarURIs = this._getJarList(); 256 List<URL> jarURLs = new LinkedList<URL>(); 257 for (URI jarURI : jarURIs) { 258 jarURLs.add(jarURI.toURL()); 259 } 260// jarURLs.add(new File("").toURI().toURL()); 261 262 jarArray = jarURLs.toArray(new URL[jarURLs.size()]); 263 264 } catch (MalformedURLException e) { 265 e.printStackTrace(); 266 throw new IllegalActionException(_director, e.getMessage()); 267 } 268 269 //remove output path if needed 270 _overwriteOutput = ((BooleanToken) overwriteOutput.getToken()).booleanValue(); 271 _autoHDFSStage = ((BooleanToken) autoHDFSStage.getToken()).booleanValue(); 272 273 final List<?> sinkActors = _container.entityList(DDPDataSink.class); 274 275 try { 276 //create a tmp directory for each execution if HDFS is used 277 if(_sameJVM) 278 _tmpDir = _getARadomLocalDir(); 279 else 280 _tmpDir = _getARadomHDFSDir(null, true); 281 for (Object object : sinkActors) { 282 final DDPDataSink sink = (DDPDataSink) object; 283 Collection<ControlledJob> jobs = _getHadoopJobs(null, null, sink, null, false, false, null, false); 284 synchronized(_jobLock) { 285 _jobControl.addJobCollection(jobs); 286 } 287 } 288 } catch (IOException e) { 289 e.printStackTrace(); 290 throw new IllegalActionException(_director, e, 291 "Error when creating Hadoop job chains."); 292 } catch (ClassNotFoundException e) { 293 e.printStackTrace(); 294 throw new IllegalActionException(_director, "Class: " + e.getMessage() + 295 " cannot be found. Please make sure its jar is included."); 296 } 297 298 if(!_sameJVM) { 299 synchronized(_jobLock) { 300 for (ControlledJob waitingJob : _jobControl.getWaitingJobList()) { 301 _log.info("job:" + waitingJob); 302 _setJars(waitingJob.getJob().getConfiguration(), jarArray); 303 } 304 } 305 } 306 307 // create a thread to monitor the job control thread. 308 // when the jobs are all finished, it stops the thread. 309 Thread monitorThread = new Thread(new Runnable() { 310 @Override 311 public void run() { 312 while (true) { 313 synchronized(_jobLock) { 314 if(_jobControl == null) { 315 return; 316 } 317 if(_jobControl.allFinished()) { 318 _jobControl.stop(); 319 return; 320 } 321 try { 322 _jobLock.wait(3000); 323 } catch (InterruptedException e) { 324 e.printStackTrace(); 325 } 326 } 327 } 328 } 329 }); 330 monitorThread.start(); 331 332 // run the job control thread 333 Thread jobRunner = new Thread(new Runnable() { 334 @Override 335 public void run() { 336 _jobControl.run(); 337 } 338 }); 339 System.out.println("Submitting Hadoop job."); 340 jobRunner.start(); 341 342 try { 343 jobRunner.join(); 344 } catch (InterruptedException e) { 345 throw new IllegalActionException(_director, e, "Error waiting for job."); 346 } 347 348 /* 349 while(jobRunner.isAlive()) { 350 try { 351 synchronized(_jobLock) { 352 if(!_stopJob.get()) { 353 _jobLock.wait(3000); 354 } 355 } 356 } catch (InterruptedException e) { 357 throw new IllegalActionException(_director, e, "Error waiting for job."); 358 } 359 } 360 */ 361 362 System.out.println("Hadoop job finished."); 363 364 365 synchronized(_jobLock) { 366 _log.info("All hadoop jobs are finished."); 367 String[] failedJobArray = new String[_jobControl.getFailedJobList() 368 .size()]; 369 int i = 0; 370 for (ControlledJob job : _jobControl.getFailedJobList()) { 371 _log.info("failed job:" + job); 372 failedJobArray[i] = job.getMessage(); 373 i++; 374 } 375 376 if (i > 0) { 377 String message = Arrays.toString(failedJobArray); 378 System.err.println(message); 379 throw new IllegalActionException(_director, 380 "Some jobs failed during execution: " + message.substring(1, message.length()-1) + " You may have to check Hadoop job logs to know the reason of failure."); 381 } 382 383 _jobControl = null; 384 } 385 386 387 //stage data out 388 try { 389 if (_stageOutDirMap != null){ 390 for (String hdfsPath : _stageOutDirMap.keySet()){ 391 _stageOut(hdfsPath, _stageOutDirMap.get(hdfsPath)); 392 } 393 } 394 } catch (IOException e) { 395 396 e.printStackTrace(); 397 throw new IllegalActionException(_director, e, 398 "Error when staging data out from HDFS."); 399 } finally { 400 _stageOutDirMap = null; 401 } 402 403 // see if there were sinks using token output, and we not using same jvm 404 if(!_tokenOutputFileMap.isEmpty()) { 405 _transferTokenOutputsFromHDFS(); 406 } 407 } 408 409 // FIXME only run once 410 @Override 411 public boolean postfire() throws IllegalActionException { 412 //remove the tmp path at HDFS 413 try { 414 boolean _rmTmpHDFSDir = ((BooleanToken) removeTmpHDFSDir 415 .getToken()).booleanValue(); 416 if (_rmTmpHDFSDir) 417 _removeOutput(_tmpDir, true); 418 } catch (IOException e) { 419 420 e.printStackTrace(); 421 throw new IllegalActionException(_director, e, 422 "Error when removing tmp data at HDFS."); 423 } 424 fs = null; // reset fs. 425 return super.postfire(); 426 } 427 428 @Override 429 public void preinitialize() throws IllegalActionException { 430 431 // call super class preinitialize to validate settables and 432 // preinitialize actors 433 super.preinitialize(); 434 _checkModel(); 435 436 // load the configuration and create a new client 437 if (_configDirStr.trim().isEmpty()) { 438 // set the default location of the config directory 439 String workflowDirStr = System.getProperty("hadoop.workflowdir"); 440 if (workflowDirStr == null) { 441 throw new InternalErrorException( 442 "System property hadoop.workflowdir not set."); 443 } 444 _configDirStr = workflowDirStr + File.separator + "tools" 445 + File.separator + "etc" + File.separator + "hadoop"; //update conf dir for hadoop 2.0 446 } 447 448 // make sure conf dir exists 449 final File configDir = new File(_configDirStr); 450 if (!configDir.exists()) { 451 throw new IllegalActionException(_director, 452 "Hadoop configuration directory " + _configDirStr 453 + " does not exist."); 454 } 455 456 //re-set variables 457 _stageOutDirMap = null; 458 _tmpDir = null; 459 _tokenOutputFileMap.clear(); 460 461 } 462 463 /** Stop any running Hadoop jobs. */ 464 @Override 465 public void stop() throws IllegalActionException { 466 super.stop(); 467 468 //System.out.println("stop"); 469 synchronized(_jobLock) { 470 //System.out.println("stop got lock"); 471 if(_jobControl != null) { 472 System.out.println("Stopping submitted jobs."); 473 _jobControl.stop(); 474 List<ControlledJob> jobs = _jobControl.getReadyJobsList(); 475 jobs.addAll(_jobControl.getWaitingJobList()); 476 jobs.addAll(_jobControl.getRunningJobList()); 477 for(ControlledJob job : jobs) { 478 try { 479 job.failJob("Job stopped by user"); 480 } catch (Exception e) { 481 throw new IllegalActionException(_director, e, "Error stopping Hadoop job."); 482 } 483 } 484 _jobLock.notifyAll(); 485 } 486 } 487 } 488 489 /** Shutdown the Hadoop server if one was started. */ 490 public static void shutdownServer() throws IllegalActionException { 491 492// if(_sameJVM) //Hadoop Server is not started for the same JVM setting, so no shutdown is needed. 493// return; 494 495 for (String configDirStr : _startedConfigDirs) { 496 497 System.out 498 .println("Stopping Hadoop server for configuration directory " 499 + configDirStr); 500 501 String hadoopDirStr = new File(configDirStr).getParentFile().getParent(); 502 String stopScriptStr = hadoopDirStr + File.separator + "bin" 503 + File.separator + "stop-hadoop.sh"; 504 ProcessBuilder stopBuilder = new ProcessBuilder(stopScriptStr); 505// String stopMRScriptStr = parentDirStr + File.separator + "bin" 506// + File.separator + "stop-mapred.sh"; 507// ProcessBuilder stopMRBuilder = new ProcessBuilder(stopMRScriptStr); 508 509 // make sure JAVA_HOME is set 510 java.util.Map<String,String> env = stopBuilder.environment(); 511 if(env.get("JAVA_HOME") == null) { 512 env.put("JAVA_HOME", System.getProperty("java.home")); 513 } 514 if (env.get("HADOOP_INSTALL") == null) { 515 env.put("HADOOP_INSTALL", hadoopDirStr); 516 } 517 if (env.get("HADOOP_COMMON_HOME") == null) { 518 env.put("HADOOP_COMMON_HOME", hadoopDirStr); 519 } 520 if (env.get("HADOOP_MAPRED_HOME") == null) { 521 env.put("HADOOP_MAPRED_HOME", hadoopDirStr); 522 } 523 if (env.get("HADOOP_HDFS_HOME") == null) { 524 env.put("HADOOP_HDFS_HOME", hadoopDirStr); 525 } 526 if (env.get("YARN_HOME") == null) { 527 env.put("YARN_HOME", hadoopDirStr); 528 } 529 try { 530 Process process = stopBuilder.start(); 531 process.waitFor(); 532// process = stopMRBuilder.start(); 533// process.waitFor(); 534 } catch (Exception e) { 535 throw new IllegalActionException("Unable to stop Hadoop: " 536 + e.getMessage()); 537 } 538 } 539 } 540 541 /** Free resources. */ 542 @Override 543 public void wrapup() throws IllegalActionException { 544 super.wrapup(); 545 fs = null; 546 synchronized(_jobLock) { 547 _jobControl = null; 548 } 549 } 550 551 /** Add parameters to the containing director. */ 552 @Override 553 protected void _addParameters() throws IllegalActionException, NameDuplicationException { 554 555 overwriteOutput = (Parameter) _director.getAttribute("overwriteOutput"); 556 if(overwriteOutput == null) { 557 overwriteOutput = new Parameter(_director, "overwriteOutput"); 558 overwriteOutput.setTypeEquals(BaseType.BOOLEAN); 559 overwriteOutput.setToken(BooleanToken.TRUE); 560 } 561 562 removeTmpHDFSDir = (Parameter) _director.getAttribute("removeTmpHDFSDir"); 563 if(removeTmpHDFSDir == null) { 564 removeTmpHDFSDir = new Parameter(_director, "removeTmpHDFSDir"); 565 removeTmpHDFSDir.setTypeEquals(BaseType.BOOLEAN); 566 removeTmpHDFSDir.setToken(BooleanToken.TRUE); 567 } 568 569 autoHDFSStage = (Parameter) _director.getAttribute("autoHDFSStage"); 570 if(autoHDFSStage == null) { 571 autoHDFSStage = new Parameter(_director, "autoHDFSStage"); 572 autoHDFSStage.setTypeEquals(BaseType.BOOLEAN); 573 autoHDFSStage.setToken(BooleanToken.FALSE); 574 } 575 } 576 577 /** Remove engine-specific parameters from the director. */ 578 @Override 579 protected void _removeParameters() throws IllegalActionException, NameDuplicationException { 580 overwriteOutput.setContainer(null); 581 removeTmpHDFSDir.setContainer(null); 582 autoHDFSStage.setContainer(null); 583 } 584 585 /** Set the port types inside a cloned pattern actor. 586 * TODO: merge this with _setPortTypes() in the parent class. 587 * 588 * @param actor the cloned actor 589 */ 590 @Override 591 protected java.util.Map<String, Type> _setPortTypes(DDPPatternActor actor) 592 throws IllegalActionException { 593 java.util.Map<String, Type> typeMap = new HashMap<String, Type>(); 594 595 // set the types of the ports of the input/output actors via 596 // TypeAttributes 597 final List<?> pactorPorts = actor.portList(); 598 for (Object object : pactorPorts) { 599 final TypedIOPort pactorPort = (TypedIOPort) object; 600 _log.debug("pactorPort:" + pactorPort); 601 602 // get connected ports 603 final List<?> connectedPorts = pactorPort.insidePortList(); 604 for (Object object2 : connectedPorts) { 605 final TypedIOPort connectedPort = (TypedIOPort) object2; 606 _log.debug("connectedPort:" + connectedPort); 607 608 // set the types of ports connected to the pactor port so their 609 // types can be used to set the pact types in the stub 610 TypeAttribute typeAttribute; 611 try { 612 typeAttribute = new TypeAttribute(connectedPort, 613 connectedPort.getName() + "Type"); 614 } catch (NameDuplicationException e) { 615 throw new IllegalActionException(_director, e, 616 "Error creating type attribute for " 617 + connectedPort.getFullName()); 618 } 619 typeAttribute.setExpression(connectedPort.getType().toString()); 620 _log.debug("set connectedPort " + connectedPort + "'type to be:" + connectedPort.getType()); 621 622 // get the ports of the input/output actor 623 List<?> inputOutputActorPorts; 624 if (connectedPort.isInput()) { 625 inputOutputActorPorts = ((Actor) connectedPort 626 .getContainer()).outputPortList(); 627 } else { 628 inputOutputActorPorts = ((Actor) connectedPort 629 .getContainer()).inputPortList(); 630 } 631 632 // set the types for the input/output actor ports 633 for (Object object3 : inputOutputActorPorts) { 634 final TypedIOPort inputOutputActorPort = (TypedIOPort) object3; 635 _log.debug("inputOutputActorPort:" + inputOutputActorPort); 636 TypeAttribute existingAtt = (TypeAttribute)inputOutputActorPort.getAttribute(inputOutputActorPort.getName() + "Type"); 637 _log.debug("type attribute of inputOutputActorPort:" + existingAtt); 638 if (existingAtt == null) { 639 try { 640 typeAttribute = new TypeAttribute(inputOutputActorPort, 641 inputOutputActorPort.getName() + "Type"); 642 } catch (NameDuplicationException e) { 643 throw new IllegalActionException(_director, e, 644 "Error creating type attribute for " 645 + inputOutputActorPort.getFullName()); 646 } 647 _log.debug("in actor: "+ actor.getName() + ", its port name:" + inputOutputActorPort.getName() + " , port type:" + inputOutputActorPort.getType()); 648 typeMap.put(inputOutputActorPort.getName(), inputOutputActorPort.getType()); 649 typeAttribute.setExpression(inputOutputActorPort.getType() 650 .toString()); 651 } 652 } 653 654 } 655 656 } 657 658 return typeMap; 659 } 660 661 662 663 /** Check if Hadoop server is running. If not, try to start it. */ 664 private void _checkServer(InetSocketAddress hdfsAddress, InetSocketAddress jobManagerAddress) 665 throws IllegalActionException { 666 Socket socket1 = new Socket(); 667 Socket socket2 = new Socket(); 668 boolean hdfsConnected = false; 669 boolean jobManagerConnected = false; 670 try { 671 socket1.connect(hdfsAddress, _CONNECT_TIMEOUT); 672 hdfsConnected = true; 673 socket2.connect(jobManagerAddress, _CONNECT_TIMEOUT); 674 jobManagerConnected = true; 675 676 } catch (IOException e) { 677 if (!hdfsConnected) 678 System.out 679 .println("Hadoop HDFS server: " + hdfsAddress + " does not appear to be running."); 680 if (!jobManagerConnected) 681 System.out 682 .println("Hadoop Resource Manager server: " + jobManagerAddress + " does not appear to be running."); 683 System.out 684 .println("Try to start hadoop with configuration directory:" + _configDirStr); 685 // start hadoop 686 File configDir = new File(_configDirStr); 687 String hadoopDirStr = configDir.getParentFile().getParent(); 688 //format node could delete all data in HDFS. It is too dangerous to use. 689 String startScriptStr = hadoopDirStr + File.separator + "bin" 690 + File.separator + "start-hadoop.sh"; 691 692 693 // see if the script is executable. kepler modules are zipped, 694 // which does not preserve the permissions. 695 File startScriptFile = new File(startScriptStr); 696 if(!startScriptFile.exists()) { 697 throw new IllegalActionException(_director, 698 "The script " + startScriptFile + " does not exist.\n" + 699 "Give up on automatic Hadoop starting. \n" + 700 "You have to start Hadoop manually."); 701 } 702 if(!startScriptFile.canExecute()) { 703 throw new IllegalActionException(_director, 704 "The script " + startScriptFile + " is not executable.\n" + 705 "You must change the permissions so that " + 706 startScriptFile.getName() + 707 " and all the other scripts in \n" + 708 startScriptFile.getParent() + " are executable."); 709 } 710 711 ProcessBuilder startBuilder = new ProcessBuilder(startScriptStr); 712 713 // make sure HADOOP_COMMON_HOME is set 714 java.util.Map<String, String> env = startBuilder.environment(); 715 if (env.get("JAVA_HOME") == null) { 716 env.put("JAVA_HOME", System.getProperty("java.home")); 717 } 718 if (env.get("HADOOP_INSTALL") == null) { 719 env.put("HADOOP_INSTALL", hadoopDirStr); 720 } 721 if (env.get("HADOOP_COMMON_HOME") == null) { 722 env.put("HADOOP_COMMON_HOME", hadoopDirStr); 723 } 724 if (env.get("HADOOP_MAPRED_HOME") == null) { 725 env.put("HADOOP_MAPRED_HOME", hadoopDirStr); 726 } 727 if (env.get("HADOOP_HDFS_HOME") == null) { 728 env.put("HADOOP_HDFS_HOME", hadoopDirStr); 729 } 730 if (env.get("YARN_HOME") == null) { 731 env.put("YARN_HOME", hadoopDirStr); 732 } 733 int exitCode; 734 StringBuilder errString = new StringBuilder(); 735 try { 736 Process process = startBuilder.start(); 737 exitCode = process.waitFor(); 738 _log.debug("exitCode : " + exitCode); 739 740 InputStream stderr = process.getErrorStream(); 741 InputStream stdout = process.getInputStream(); 742 743 BufferedReader stdReader = new BufferedReader (new InputStreamReader(stdout)); 744 BufferedReader errReader = new BufferedReader (new InputStreamReader(stderr)); 745 746 String line; 747 748 while ((line = stdReader.readLine ()) != null) { 749 _log.debug ("Stdout: " + line); 750 System.out.println("Stdout: " + line); 751 } 752 753 while ((line = errReader.readLine ()) != null) { 754 _log.debug ("Stderr: " + line); 755 errString.append(line).append("\n"); 756 } 757 758 stdReader.close(); 759 errReader.close(); 760 } catch (Exception e1) { 761 throw new IllegalActionException(_director, e1, 762 "Unable to start Hadoop."); 763 } 764 765 if (exitCode != 0) { 766 throw new IllegalActionException(_director, 767 "Error when start Hadoop:" + errString); 768 } 769 770 _startedConfigDirs.add(_configDirStr); 771 772 int tries = 0; 773 while (tries < 5) { 774 // wait for the server to start 775 try { 776 Thread.sleep(5000); 777 tries++; 778 System.out.println("Connecting to Hadoop server port try #" 779 + tries); 780 try { 781 if (!jobManagerConnected) { 782 socket1 = new Socket(); 783 socket1.connect(jobManagerAddress, _CONNECT_TIMEOUT); 784 jobManagerConnected = true; 785 } 786 if (!hdfsConnected) { 787 socket2 = new Socket(); 788 socket2.connect(hdfsAddress, _CONNECT_TIMEOUT); 789 hdfsConnected = true; 790 } 791 break; 792 } catch (IOException e1) { 793 //e1.printStackTrace(); 794 } 795 } catch (InterruptedException e2) { 796 throw new IllegalActionException(_director, e2, 797 "Error while sleeping."); 798 } 799 } 800 801 // if we get here, we were able to connect to the hadoop job 802 // manager port. however, hadoop may not be completely 803 // initialized, so wait a few more seconds 804 System.out 805 .println("Waiting 5 seconds for Hadoop server to initialize."); 806 try { 807 Thread.sleep(5000); 808 } catch (InterruptedException e2) { 809 throw new IllegalActionException(_director, e2, 810 "Error while waiting " 811 + " for Hadoop server to initialize."); 812 } 813 814 } finally { 815 try { 816 if (jobManagerConnected) 817 socket1.close(); 818 if (hdfsConnected) 819 socket2.close(); 820 } catch (IOException e) { 821 throw new IllegalActionException(_director, e, 822 "Error closing socket."); 823 } 824 } 825 826 if (!jobManagerConnected) 827 throw new IllegalActionException(_director, 828 "Could not connect to Hadoop server: " + jobManagerAddress); 829 else if (!hdfsConnected) 830 throw new IllegalActionException(_director, 831 "Could not connect to HDFS server: " + "hdfs://" + hdfsAddress); 832 } 833 834 /** 835 * Clone and export actor. 836 * 837 * @throws IllegalActionException 838 * @throws IOException 839 * @throws ClassNotFoundException 840 */ 841 private DDPPatternActorUtil _cloneAndExport(SingleInputPatternActor actor) 842 throws IllegalActionException, IOException, ClassNotFoundException { 843 // export actor for non-IO pactors. 844 SingleInputPatternActor clonePactor; 845 final Workspace workspace; 846 847 if(_sameJVM) { 848 workspace = new Workspace(actor.getName()); 849 } else { 850 workspace = _director.workspace(); 851 } 852 853 try { 854 clonePactor = (SingleInputPatternActor) actor.clone(workspace); 855 java.util.Map<String, Type> portTypeMap = _setPortTypes(clonePactor); 856 _copyParameters(actor, clonePactor); 857 if (!actor.getExecutionClassName().equalsIgnoreCase("")) { 858 DDPPatternActorUtil actorUtil = new DDPPatternActorUtil(clonePactor, clonePactor.getName(), actor.getExecutionClassName(), clonePactor.inKeyValueTypes.stringValue(), clonePactor.outKeyValueTypes.stringValue()); 859 return actorUtil; 860 } else { 861 862 String actorString = null; 863 if(_sameJVM) { 864 _subWorkflows.put(actor.getName(), clonePactor); 865 } else { 866 Utilities.removeModelPorts(clonePactor); 867 if (_writeSubWorkflowsToFiles) 868 _exportSubWorkflow(clonePactor); 869 // write model to a string 870 final StringWriter writer = new StringWriter(); 871 try { 872 clonePactor.exportMoML(writer); 873 } catch (IOException e) { 874 throw new IllegalActionException(_director, e, 875 "Error serializing model."); 876 } 877 actorString = writer.toString(); 878 } 879 880 conf.setBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, 881 clonePactor.getRunWorkflowLifecyclePerInput()); 882 883 DDPPatternActorUtil actorUtil = new DDPPatternActorUtil(clonePactor, clonePactor.getName(), actorString, portTypeMap); 884 return actorUtil; 885 } 886 } catch (CloneNotSupportedException e) { 887 throw new IllegalActionException(_director, e, "Unable to clone " 888 + actor.getName()); 889 } 890 } 891 892 /** 893 * Get the class specified in a Pactor's parameter. 894 * 895 * @actor the Pactor 896 * @parameterName the name of the parameter containing the class name. 897 */ 898 private Class<?> _getClassFromParameter(DDPPatternActor actor, 899 String parameterName) throws IllegalActionException { 900 901 Parameter parameter = (Parameter) ((DDPPatternActor) actor) 902 .getAttribute(parameterName); 903 if (parameter == null) { 904 throw new IllegalActionException(actor, "Missing parameter " 905 + parameterName + "."); 906 } 907 908 // make sure it's not empty. 909 String className = ((StringToken) parameter.getToken()).stringValue(); 910 if (className.trim().isEmpty()) { 911 throw new IllegalActionException(actor, 912 "Missing value for parameter " + parameterName + "."); 913 } 914 915 // try to get the class 916 Class<?> clazz; 917 try { 918 clazz = Class.forName(className); 919 } catch (ClassNotFoundException e) { 920 throw new IllegalActionException(actor, "Class " + className 921 + " is not on the classpath."); 922 } 923 924 return clazz; 925 } 926 927 /** Get the hadoop Job for a PACTor. 928 * @param container The container. 929 * @param hasSpecialMapper Whether a mapper is added for Match/CoGrooup/Cross. 930 * @throws ClassNotFoundException 931 * @throws IllegalActionException 932 * @throws IOException 933 */ 934 private Collection<ControlledJob> _getHadoopJobs( 935 Collection<ControlledJob> jobList, ControlledJob job, 936 DDPPatternActor actor, List<DDPPatternActorUtil> actorList, 937 boolean hasReducer, boolean hasSpecialMapper, String tmpDir, 938 boolean tag) throws IOException, IllegalActionException, ClassNotFoundException { 939 940 if (actor instanceof DDPDataSink) { 941 String formatClassName = null; 942 ControlledJob controlJob = null; 943 Collection<ControlledJob> ctrJobList = null; 944 try { 945 //Job currentJob = new Job(this.conf); 946 Job currentJob = Job.getInstance(this.conf); 947// _classLoader = _loadActorJars(actor); 948 formatClassName = ((DDPDataSink) actor).getFormatClassName(_engineName); 949 //formatClassName = _getFormatClassName((DDPDataSink) actor, 950 //"formatType", false); 951 Class<?> clazz = _classLoader.loadClass(formatClassName) 952 .asSubclass(OutputFormat.class); 953 currentJob.setOutputFormatClass((Class<? extends OutputFormat>) clazz); 954// currentJob.setOutputFormatClass(Class.forName(formatClassName) 955// .asSubclass(OutputFormat.class)); 956 957 if(TokenOutputFormat.class.isAssignableFrom(clazz)) { 958 currentJob.getConfiguration().set(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME, 959 actor.getFullName()); 960 961 // see if we're in distributed mode 962 if(!_sameJVM) { 963 964 // TokenOutputFormat cannot access DDPDataSink since it's in a 965 // different JVM. instead, we use TextOutputFormat to write the 966 // results to HDFS, and once the job is finished, copy the results 967 // to DDPDataSink (see _transferTokenOutputsFromHDFS()). 968 969 // create a temporary directory in HDFS for the token outputs 970 String tokenOutputDir = _getARadomHDFSDir(_tmpDir, false); 971 Path tokenOutputPath = new Path(tokenOutputDir + "/token-output.txt"); 972 _tokenOutputFileMap.put(tokenOutputPath, (DDPDataSink)actor); 973 FileOutputFormat.setOutputPath(currentJob, tokenOutputPath); 974 //System.out.println("Setting token output to be " + _tokenOutputFilePath); 975 976 // use TextOutputFormat instead of TokenOutputFormat 977 currentJob.setOutputFormatClass(TextOutputFormat.class); 978 } 979 980 } else if(clazz != NullOutputFormat.class) { 981 String outputPath = _getParaValue((DDPDataSink) actor, 982 "path"); 983 _log.info("output path:" + outputPath); 984 _verifyParaSetting(outputPath, actor); 985 if (outputPath.substring(0, 4).equalsIgnoreCase("hdfs")) { 986 //if HDFS is used for sameJVM, throw an exception. 987 if (_sameJVM) { 988 throw new IllegalActionException( 989 _director, 990 "the path " + outputPath + " in actor " 991 + actor.getName() 992 + " cannot be HDFS url when sameJVM is chosen for parameter 'startServerType' in director " + _director.getName() + "."); 993 } 994 if (_overwriteOutput){ 995// _removeOutput(outputPath, false); 996 _removeOutputDir(outputPath, actor); 997 } 998 999 1000 // } else if (new File(outputPath).exists() || new File(outputPath).mkdirs()){ 1001 } else {//data is in local file system. 1002 System.out.println("outputPath:" + outputPath); 1003// System.out.println("File.separator:" + File.separator); 1004 if (outputPath.substring(0, 8).equalsIgnoreCase("file:///")) 1005 outputPath = outputPath.substring(7); 1006 if (new File(outputPath).exists() && _overwriteOutput){ 1007 if (new File(outputPath).isDirectory()) { 1008 _removeOutputDir(outputPath, actor); 1009 } 1010 else if (!new File(outputPath).delete()) 1011 throw new IllegalActionException( 1012 _director, 1013 "the path " + outputPath + " in actor " 1014 + actor.getName() + " cannot be deleted. Please check file system." ); 1015 } 1016 if (_autoHDFSStage || _sameJVM) {//the outputs should be staged out from HDFS to local file system. If _sameJVM is true, always stage results to local file system to be consistent with other DDP engines. 1017 if (_stageOutDirMap == null) 1018 _stageOutDirMap = new HashMap<String, String>(); 1019 String randomOutPath = _getARadomHDFSDir(_tmpDir, false); 1020 _stageOutDirMap.put(randomOutPath, outputPath); 1021 outputPath = randomOutPath; 1022 } else { 1023 if (File.separator.equalsIgnoreCase("/")) //linux and mac 1024 outputPath = "file://" + outputPath; //add file prefix back 1025 else { 1026 System.out.println("In windows, outputPath:" + outputPath); 1027 outputPath = "file:///" + outputPath; 1028 } 1029 } 1030 } 1031 // //do nothing because we will use local file as output directly. 1032 // //the output path is local file, data will staged out and outputpath is set to be a random hdfs path. 1033 // if (_stageOutDirMap == null) 1034 // _stageOutDirMap = new HashMap<String, String>(); 1035 // String randomOutPath = _getARadomHDFSDir(_tmpDir, false); 1036 //// _stageOutDirMap.put(randomOutPath, outputPath); 1037 //// if (_overwriteOutput){ 1038 //// new File(outputPath).delete(); 1039 //// } 1040 //// outputPath = randomOutPath; 1041 //// } else if (!new File(outputPath).getParentFile().exists() && !(new File(outputPath).mkdirs())) { 1042 // } else { 1043 // throw new IllegalActionException( 1044 // _director, 1045 // "the path " + outputPath + " in actor " 1046 // + actor.getName() 1047 // + " does not exist and cannot be created. Maybe its file system is not supported yet."); 1048 // } 1049 1050 FileOutputFormat.setOutputPath(currentJob, new Path(outputPath)); 1051 1052 } 1053 _addActorParamsToJob(actor, currentJob.getConfiguration()); 1054 controlJob = new ControlledJob(currentJob, null); 1055 ctrJobList = new LinkedList<ControlledJob>(); 1056 ctrJobList.add(controlJob); 1057 } catch (ClassCastException e) { 1058 e.printStackTrace(); 1059 throw (new IllegalActionException("The class name: " 1060 + formatClassName 1061 + " is not sub class of OutputFormat, please check it.")); 1062 } catch (ClassNotFoundException e) { 1063 e.printStackTrace(); 1064 throw (new IllegalActionException( 1065 "No proper class can be found based on the name: " 1066 + formatClassName + " , please check it.")); 1067 } 1068 List<DDPPatternActor> upStreamActorList = this 1069 ._getUpStreamActorList(actor); 1070 1071 return _getHadoopJobs(ctrJobList, controlJob, 1072 upStreamActorList.get(0), null, false, false, null, false); 1073 1074 } else if (actor instanceof DDPDataSource) { 1075 if (actorList != null) { 1076 //add map actor based on its reverse order 1077 1078 for (int i = actorList.size()-1; i >= 0; i--) { 1079 DDPPatternActorUtil actorUtil = actorList.get(i); 1080 java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1081 // _classLoader = _loadActorJars(actor); 1082 Class impClass = null; 1083 String impClassName = actorUtil.getImpClassName(); 1084 _setPartitonerClass(job.getJob(), actorUtil.getActor()); 1085 if (impClassName != null) { //using implement class rather than sub-workflow 1086 impClass = _classLoader.loadClass(impClassName); 1087 if (!Mapper.class.isAssignableFrom(impClass)) { 1088 throw new IllegalActionException(actor, 1089 "Execution class " + 1090 impClassName + " must be a subclass of " + 1091 Mapper.class.getName()); 1092 } 1093 _addDirectorParamsToJob(conf); 1094 ChainMapper.addMapper(job.getJob(), 1095 impClass, 1096 StubUtilities.convertToHadoopType(typeMap.get("inKey")), 1097 StubUtilities.convertToHadoopType(typeMap.get("inValue")), 1098 StubUtilities.convertToHadoopType(typeMap.get("outKey")), 1099 StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf); 1100 1101 if (i==0 && !hasReducer) { //if there is no reducer, we should set it to be default reducer based on the output data type of the last mapper. 1102 final Class keyClass = StubUtilities.convertToHadoopType(typeMap.get("outKey")); 1103 final Class valueClass = StubUtilities.convertToHadoopType(typeMap.get("outValue")); 1104 ChainReducer.setReducer(job.getJob(), 1105 Reducer.class, 1106 keyClass, valueClass, 1107 keyClass, valueClass, conf); 1108 setReduceNumber(actor, job.getJob()); 1109 } 1110 } else { 1111 try { 1112 _log.info("Working on actor: " + actorUtil.getName()); 1113 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1114 if(!_sameJVM) { 1115 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1116 } 1117 _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration()); 1118 ChainMapper.addMapper(job.getJob(), 1119 MapReduce4Kepler.Mapper4Kepler.class, 1120 StubUtilities.convertToHadoopType(typeMap.get("key")), 1121 StubUtilities.convertToHadoopType(typeMap.get("value")), 1122 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1123 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf); 1124// if (i == actorList.size()-1 && tmpDir != null) { //set the output of the last map. 1125// job.getJob().setOutputFormatClass(SequenceFileOutputFormat.class); 1126// FileOutputFormat.setOutputPath(job.getJob(), new Path(tmpDir)); 1127// } 1128 if (i==0 && !hasReducer) { //if there is no reducer, we should set it to be default reducer based on the output data type of the last mapper. 1129 final Class<?> keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"); 1130 final Class<?> valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"); 1131 ChainReducer.setReducer(job.getJob(), 1132 Reducer.class, 1133 keyClass, valueClass, 1134 keyClass, valueClass, conf); 1135 setReduceNumber(actor, job.getJob()); 1136 } 1137 } catch (IOException ioe) { 1138 ioe.printStackTrace(); 1139 throw new IllegalActionException(_director, ioe, "Error when transforming actor:" + actorUtil.getName() + " into Hadoop job."); 1140 } catch (IllegalArgumentException iae) { 1141 iae.printStackTrace(); 1142 throw new IllegalActionException(_director, iae, "Error when transforming actor:" + actorUtil.getName() + " into Hadoop job."); 1143 } 1144 } 1145 } 1146 1147 } else if (!hasSpecialMapper){ //actor List is empty and there is no mapper in this job. Then add an identical mapper based on type of 'out' port of DDPSource. 1148 _log.debug("processing with no mapper cases for the job connected with DDPSource"); 1149 ArrayType outType = (ArrayType)((TypedIOPort)actor.getPort("out")).getType(); 1150 Class keyClass = StubUtilities.convertRecordArrayToHadoopType(outType, "key"); 1151 Class valueClass = StubUtilities.convertRecordArrayToHadoopType(outType, "value"); 1152// job.getJob().setMapOutputKeyClass(keyClass); 1153// job.getJob().setMapOutputValueClass(valueClass); 1154// job.getJob().setMapperClass(Mapper.class); 1155 ChainMapper.addMapper(job.getJob(), 1156 Mapper.class, 1157 keyClass, valueClass, 1158 keyClass, valueClass, conf); 1159// job.getJob().setOutputKeyClass(keyClass); 1160// job.getJob().setOutputValueClass(valueClass); 1161 if(!hasReducer) { //if there is no reducer, we should set it to be default reducer based on the output data type of the last mapper. 1162 ChainReducer.setReducer(job.getJob(), 1163 Reducer.class, 1164 keyClass, valueClass, 1165 keyClass, valueClass, conf); 1166 } 1167 setReduceNumber(actor, job.getJob()); 1168 } 1169 1170 String formatClassName = null; 1171 try { 1172 1173 formatClassName = ((DDPDataSource) actor).getFormatClassName(_engineName); 1174 1175 // try to get the class 1176 Class<?> clazz; 1177 try { 1178 clazz = _classLoader.loadClass(formatClassName); 1179 } catch (ClassNotFoundException e) { 1180 throw new IllegalActionException(actor, "Format type " + formatClassName + 1181 " was not found in the format types configurations or is a class not on the classpath."); 1182 } 1183 1184 //formatClassName = _getFormatClassName((DDPDataSource) actor, 1185 //"formatType", true); 1186// job.getJob().setInputFormatClass( 1187// Class.forName(formatClassName).asSubclass( 1188// InputFormat.class)); 1189 if(TokenInputFormat.class.isAssignableFrom(clazz)) { 1190 job.getJob().setInputFormatClass((Class<? extends InputFormat>) clazz); 1191 1192 if(_sameJVM) { 1193 job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME, 1194 actor.getFullName()); 1195 } else { 1196 1197 Token token = DDPDataSource.getToken(actor.getFullName()); 1198 1199 if(token == null) { 1200 throw new IllegalActionException(actor, 1201 "No input token found for source actor " + actor.getName()); 1202 } 1203 1204 job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, token.toString()); 1205 } 1206 1207 } else { 1208 String inputPath = _getParaValue((DDPDataSource) actor, 1209 "path"); 1210 _verifyParaSetting(inputPath, actor); 1211 _log.debug("input path:" + inputPath); 1212 if (!_sameJVM) { 1213 if (!inputPath.startsWith("hdfs://")) { 1214 if (inputPath.substring(0, 8).equalsIgnoreCase( 1215 "file:///")) 1216 inputPath = inputPath.substring(7); 1217 if (!new File(inputPath).exists()) 1218 new IllegalActionException(_director, 1219 "the path " + inputPath + " in actor " 1220 + actor.getName() 1221 + " does not exist."); 1222 if (_autoHDFSStage) { 1223 // the input path is local file. then stage data 1224 // in and set the hdfs path as inputPath. 1225 inputPath = this._stageIn(_tmpDir, inputPath); 1226 // } else { 1227 // new IllegalActionException( 1228 // _director, 1229 // "the path " + inputPath + " in actor " 1230 // + actor.getName() 1231 // + 1232 // " is not correct or its file system is not supported yet"); 1233 } else { 1234 // add file prefix back 1235 if (File.separator.equalsIgnoreCase("/")) // linux 1236 // and 1237 // mac 1238 inputPath = "file://" + inputPath; 1239 else { 1240 System.out.println("In windows, inputPath:" 1241 + inputPath); 1242 inputPath = "file:///" + inputPath; 1243 ; 1244 } 1245 } 1246 } 1247 } else { 1248 // if HDFS is used for sameJVM, throw an exception. 1249 if (inputPath.substring(0, 4).equalsIgnoreCase("hdfs")) { 1250 new IllegalActionException( 1251 _director, 1252 "the path " 1253 + inputPath 1254 + " in actor " 1255 + actor.getName() 1256 + " cannot be HDFS url when sameJVM is chosen for parameter 'startServerType' in director " 1257 + _director.getName() + "."); 1258 } 1259 } 1260 // FileInputFormat 1261 // .setInputPaths(job.getJob(), new Path(inputPath)); 1262 // _classLoader = _loadActorJars(actor); 1263 MultipleInputs.addInputPath(job.getJob(), new Path(inputPath), _classLoader.loadClass(formatClassName).asSubclass( 1264 InputFormat.class)); 1265 if (tag) 1266 job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_TAG, inputPath); 1267 // MultipleInputs.addInputPath(job.getJob(), new Path(inputPath), Class.forName(formatClassName).asSubclass( 1268 // InputFormat.class)); 1269 } 1270 1271 _addActorParamsToJob(actor, job.getJob().getConfiguration()); 1272 1273 job.getJob().setJobName(actor.getName()); 1274 } catch (ClassCastException e) { 1275 e.printStackTrace(); 1276 throw (new IllegalActionException("The class name: " 1277 + formatClassName 1278 + " is not sub class of OutputFormat, please check it.")); 1279 } catch (ClassNotFoundException e) { 1280 e.printStackTrace(); 1281 throw (new IllegalActionException( 1282 "No proper class can be found based on the name: " 1283 + formatClassName + " , please check it.")); 1284 } 1285 1286 return jobList; 1287 } else { 1288 1289 if (actor instanceof Map) { 1290 if (actorList == null) 1291 actorList = new LinkedList<DDPPatternActorUtil>(); 1292 try { 1293 actorList.add(_cloneAndExport((Map)actor)); 1294 } catch (ClassNotFoundException e) { 1295 throw new IllegalActionException(actor, e, 1296 "Execution class " + 1297 actor.getExecutionClassName() + " is not found."); 1298 } 1299 return _getHadoopJobs(jobList, job, 1300 _getUpStreamActorList(actor).get(0), actorList, 1301 hasReducer, hasSpecialMapper, null, false); 1302 1303 } else if (actor instanceof Reduce) { 1304 DDPPatternActorUtil actorUtil; 1305 try { 1306 actorUtil = _cloneAndExport((Reduce)actor); 1307 } catch (ClassNotFoundException e) { 1308 throw new IllegalActionException(actor, e, 1309 "Execution class " + 1310 actor.getExecutionClassName() + " is not found."); 1311 } 1312 // already have reducer in this job 1313 if (hasReducer) { 1314 if (actorList != null) { 1315 for (int i = actorList.size()-1; i >= 0; i--) { 1316 actorUtil = actorList.get(i); 1317 // _classLoader = _loadActorJars(actor); 1318 Class impClass = null; 1319 String impClassName = actorUtil.getImpClassName(); 1320 final java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1321 _setPartitonerClass(job.getJob(), actorUtil.getActor()); 1322 if (impClassName != null) { //using implement class rather than sub-workflow 1323 impClass = _classLoader.loadClass(impClassName); 1324 if (!Mapper.class.isAssignableFrom(impClass)) { 1325 throw new IllegalActionException(actor, 1326 "Execution class " + 1327 impClassName + " must be a subclass of " + 1328 Mapper.class.getName()); 1329 } 1330 _addDirectorParamsToJob(conf); 1331 ChainMapper.addMapper(job.getJob(), 1332 impClass, 1333 StubUtilities.convertToHadoopType(typeMap.get("inKey")), 1334 StubUtilities.convertToHadoopType(typeMap.get("inValue")), 1335 StubUtilities.convertToHadoopType(typeMap.get("outKey")), 1336 StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf); 1337 1338 } else { 1339 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1340 if(!_sameJVM) { 1341 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1342 } 1343 _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration()); 1344 ChainMapper.addMapper(job.getJob(), 1345 MapReduce4Kepler.Mapper4Kepler.class, 1346 StubUtilities.convertToHadoopType(typeMap.get("key")), 1347 StubUtilities.convertToHadoopType(typeMap.get("value")), 1348 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1349 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf); 1350 } 1351 //special input set for the first map. 1352 if (i == actorList.size()-1) { 1353 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1354 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1355 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1356 } 1357 } 1358 } else { // actor List is empty. add an identical mapper based on upstream reducer output key value. 1359 // _classLoader = _loadActorJars(actor); 1360 Class<?> impClass = null; 1361 String impClassName = actorUtil.getImpClassName(); 1362 final java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1363 final Class<?> keyClass, valueClass; 1364 if (impClassName != null) { //using implement class rather than sub-workflow, set the key value type based on the reducer's output port type. 1365 keyClass = StubUtilities.convertToHadoopType(typeMap.get("outKey")); 1366 valueClass = StubUtilities.convertToHadoopType(typeMap.get("outValue")); 1367 } 1368 else { 1369 keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"); 1370 valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"); 1371 } 1372 ChainMapper.addMapper(job.getJob(), 1373 Mapper.class, 1374 keyClass, valueClass, 1375 keyClass, valueClass, conf); 1376 //set input for the job. 1377 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1378 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1379 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1380 } 1381 job.getJob().setJobName(actor.getName()); 1382 String formatClassName = null; 1383 ControlledJob controlJob = null; 1384 Job newJob = new Job(this.conf); 1385 _addActorParamsToJob((Reduce)actor, newJob.getConfiguration()); 1386 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1387 controlJob = new ControlledJob(newJob, null); 1388 job.addDependingJob(controlJob); 1389 jobList.add(controlJob); 1390 return _getHadoopJobs(jobList, controlJob, actor, null, 1391 false, hasSpecialMapper, tmpDir, false); 1392 1393 } else { // no reducer in this job 1394 job.getJob().setJobName(actor.getName()); 1395 // _classLoader = _loadActorJars(actor); 1396 java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1397 Class impClass = null; 1398 String impClassName = actorUtil.getImpClassName(); 1399 final String groupingClassName = ((Reduce)actor).groupingComparatorClass.stringValue(); 1400 if (!groupingClassName.isEmpty()) { 1401 final Class groupingClass = _classLoader.loadClass(groupingClassName); 1402 if (!RawComparator.class.isAssignableFrom(groupingClass)) { 1403 throw new IllegalActionException(actor, 1404 "GroupingComparator class " + 1405 groupingClass + " must be a subclass of " + 1406 RawComparator.class.getName()); 1407 } 1408 job.getJob().setGroupingComparatorClass(groupingClass); 1409 } 1410 if (impClassName != null) { //using implement class rather than sub-workflow 1411 impClass = _classLoader.loadClass(impClassName); 1412 if (!Reducer.class.isAssignableFrom(impClass)) { 1413 throw new IllegalActionException(actor, 1414 "Execution class " + 1415 actor.getExecutionClassName() + " must be a subclass of " + 1416 Reducer.class.getName()); 1417 } 1418 _addDirectorParamsToJob(conf); 1419 ChainReducer.setReducer(job.getJob(), 1420 impClass, 1421 StubUtilities.convertToHadoopType(typeMap.get("inKey")), 1422 StubUtilities.convertToHadoopType(typeMap.get("inValue")), 1423 StubUtilities.convertToHadoopType(typeMap.get("outKey")), 1424 StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf); 1425 setReduceNumber(actor, job.getJob()); 1426 // see if reducer is also a combiner 1427 if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) { 1428 final String combinerClassName = ((Reduce)actor).combineExecutionClass.stringValue(); 1429 if (combinerClassName.isEmpty()) 1430 job.getJob().setCombinerClass(impClass); 1431 else { 1432 final Class combineClass = _classLoader.loadClass(combinerClassName); 1433 if (!Reducer.class.isAssignableFrom(combineClass)) { 1434 throw new IllegalActionException(actor, 1435 "Combiner class " + 1436 combinerClassName + " must be a subclass of " + 1437 Reducer.class.getName()); 1438 } 1439 job.getJob().setCombinerClass(combineClass); 1440 } 1441 } 1442 } else { 1443 // DDPPatternActorUtil actorUtil = _cloneAndExport((Reduce)actor); 1444 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1445 if(!_sameJVM) { 1446 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1447 } 1448 _addActorParamsToJob((Reduce)actor, job.getJob().getConfiguration()); 1449 ChainReducer.setReducer(job.getJob(), 1450 MapReduce4Kepler.Reducer4Kepler.class, 1451 StubUtilities.convertToHadoopType(typeMap.get("key")), 1452 StubUtilities.convertToHadoopType(typeMap.get("values")), 1453 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1454 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf); 1455 setReduceNumber(actor, job.getJob()); 1456 // see if reducer is also a combiner 1457 if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) { 1458 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1459 if(!_sameJVM) { 1460 job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1461 } 1462 job.getJob().setCombinerClass(MapReduce4Kepler.Reducer4Kepler.class); 1463 } 1464 } 1465 if (actorList != null) { 1466 for (int i = actorList.size()-1; i >= 0; i--) { 1467 actorUtil = actorList.get(i); 1468 typeMap = actorUtil.getTypeMap(); 1469 // _classLoader = _loadActorJars(actor); 1470 impClassName = actorUtil.getImpClassName(); 1471 _setPartitonerClass(job.getJob(), actorUtil.getActor()); 1472 if (impClassName != null) { //using implement class rather than sub-workflow 1473 impClass = _classLoader.loadClass(impClassName); 1474 if (!Mapper.class.isAssignableFrom(impClass)) { 1475 throw new IllegalActionException(actor, 1476 "Execution class " + 1477 impClassName + " must be a subclass of " + 1478 Mapper.class.getName()); 1479 } 1480 _addDirectorParamsToJob(conf); 1481 ChainReducer.addMapper(job.getJob(), 1482 impClass, 1483 StubUtilities.convertToHadoopType(typeMap.get("inKey")), 1484 StubUtilities.convertToHadoopType(typeMap.get("inValue")), 1485 StubUtilities.convertToHadoopType(typeMap.get("outKey")), 1486 StubUtilities.convertToHadoopType(typeMap.get("outValue")), conf); 1487 1488 } else { 1489 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1490 if(!_sameJVM) { 1491 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1492 } 1493 _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration()); 1494 ChainReducer.addMapper(job.getJob(), 1495 MapReduce4Kepler.Mapper4Kepler.class, 1496 StubUtilities.convertToHadoopType(typeMap.get("key")), 1497 StubUtilities.convertToHadoopType(typeMap.get("value")), 1498 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1499 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf); 1500 } 1501 } 1502 } 1503 1504// else { // actor List is empty. add an identical mapper based on reducer input key value. 1505// ChainMapper.addMapper(job.getJob(), 1506// Mapper.class, 1507// Utilities.convertToHadoopType(typeMap.get("key")), 1508// Utilities.convertToHadoopType(typeMap.get("values")), 1509// Utilities.convertToHadoopType(typeMap.get("key")), 1510// Utilities.convertToHadoopType(typeMap.get("values")), conf); 1511// } 1512 //if tmpDir != null, it means this reducer is followed by a map. 1513 //so we need to set its output info. 1514 //TODO: double check whether this condition works with Match/Cross/CoGroup. 1515 if (tmpDir != null) { 1516 job.getJob().setOutputFormatClass(SequenceFileOutputFormat.class); 1517 FileOutputFormat.setOutputPath(job.getJob(), new Path(tmpDir)); 1518 } 1519 return _getHadoopJobs(jobList, job, 1520 _getUpStreamActorList(actor).get(0), null, true, hasSpecialMapper, null, false); 1521 } 1522 1523 } else if (actor instanceof DualInputPatternActor) { 1524 job.getJob().setJobName(actor.getName()); 1525 DDPPatternActorUtil actorUtil = null; 1526 try { 1527 actorUtil = _cloneAndExport((DualInputPatternActor)actor); 1528 } catch (ClassNotFoundException e) { 1529 //do nothing, because the actorUtil.getImpClass() will be null. 1530 } 1531 if (actorUtil != null && actorUtil.getImpClassName() != null) { //Hadoop doesn't support Match/Cross/CoGroup classes. 1532 throw new IllegalActionException(actor, 1533 "You cannot set executionClass parameter for HadoopDirector because Hadoop doesn't support it for Match/Cross/CoGroup actor." + 1534 "You can only use sub-workflows for these actors"); 1535 } 1536 // already have reducer in this job 1537 if (hasReducer) { 1538// DDPPatternActorUtil actorUtil = null; 1539// try { 1540// actorUtil = _cloneAndExport((DualInputPatternActor)actor); 1541// } catch (ClassNotFoundException e) { 1542// throw new IllegalActionException(actor, e, 1543// "Execution class " + 1544// actor.getExecutionClassName() + " must be a subclass of " + 1545// Map.class.getName()); 1546// } 1547 1548 1549 if (actorList != null) { 1550 for (int i = actorList.size()-1; i >= 0; i--) { 1551 actorUtil = actorList.get(i); 1552 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1553 if(!_sameJVM) { 1554 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1555 } 1556 _addActorParamsToJob(actorUtil.getActor(), job.getJob().getConfiguration()); 1557 java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1558 ChainMapper.addMapper(job.getJob(), 1559 MapReduce4Kepler.Mapper4Kepler.class, 1560 StubUtilities.convertToHadoopType(typeMap.get("key")), 1561 StubUtilities.convertToHadoopType(typeMap.get("value")), 1562 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1563 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf); 1564 //special input set for the first map. 1565 if (i == actorList.size()-1) { 1566 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1567 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1568 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1569 } 1570 } 1571 } else { // actor List is empty. add an identical mapper based on upstream reducer output key value. 1572 java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1573 Class keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"); 1574 Class valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"); 1575 ChainMapper.addMapper(job.getJob(), 1576 Mapper.class, 1577 keyClass, valueClass, 1578 keyClass, valueClass, conf); 1579 //set input for the job. 1580 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1581 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1582 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1583 } 1584 Job newJob = new Job(this.conf); 1585 _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration()); 1586 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1587 ControlledJob controlJob = new ControlledJob(newJob, null); 1588 job.addDependingJob(controlJob); 1589 jobList.add(controlJob); 1590 return _getHadoopJobs(jobList, controlJob, actor, null, 1591 false, false, tmpDir, false); 1592 1593 } else { // no reducer in this job 1594 String tmpCacheDir = null; 1595 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1596 if(!_sameJVM) { 1597 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1598 } 1599 // FIXME: update classes 1600 java.util.Map<String, Type> typeMap = actorUtil.getTypeMap(); 1601 if (actor instanceof Match) { 1602 //FIXME: it works only if the value1 and value2 has the same datatype. 1603 ChainReducer.setReducer(job.getJob(), Reducer4Match.class, 1604 StubUtilities.convertToHadoopType(typeMap.get("key")), StubUtilities.convertToTaggedType(typeMap.get("value1")), 1605 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1606 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), 1607 conf); 1608 //FIXME: it works only if the value1 and value2 has the same datatype 1609 ChainMapper.addMapper(job.getJob(), Mapper4TagValue.class, 1610 StubUtilities.convertToHadoopType(typeMap.get("key")), 1611 //Utilities.convertToTaggedType(typeMap.get("value1")), 1612 StubUtilities.convertToHadoopType(typeMap.get("value1")), 1613 StubUtilities.convertToHadoopType(typeMap.get("key")), 1614 StubUtilities.convertToTaggedType(typeMap.get("value1")), conf); 1615 job.getJob().setCombinerClass(Combiner4TagValue.class); 1616 } else if (actor instanceof CoGroup) { //actor instanceof CoGroup 1617 //FIXME: it works only if the values1 and values2 has the same datatype. 1618 ChainReducer.setReducer(job.getJob(), Reducer4CoGroup.class, 1619 StubUtilities.convertToHadoopType(typeMap.get("key")), StubUtilities.convertToTaggedType(typeMap.get("values1")), 1620 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1621 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), 1622 conf); 1623 //FIXME: it works only if the values1 and values2 has the same datatype. 1624 ChainMapper.addMapper(job.getJob(), Mapper4TagValue.class, 1625 StubUtilities.convertToHadoopType(typeMap.get("key")), 1626 //Utilities.convertToTaggedType(typeMap.get("values1")), 1627 StubUtilities.convertToHadoopType(typeMap.get("values1")), 1628 StubUtilities.convertToHadoopType(typeMap.get("key")), 1629 StubUtilities.convertToTaggedType(typeMap.get("values1")), conf); 1630 job.getJob().setCombinerClass(Combiner4TagValue.class); 1631 } else if (actor instanceof Cross) { //actor instanceof Cross 1632 //the two mappers for Cross have to be in two Hadoop jobs, otherwise the outputs of the first map will be sent to input of the second map. 1633 tmpCacheDir = _getARadomHDFSDir(_tmpDir, false); 1634 _log.debug(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH + ":" + tmpCacheDir); 1635 conf.set(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH, tmpCacheDir); 1636// final Class keyClass = Utilities.convertToHadoopType(typeMap.get("key")); 1637// final Class valueClass = Utilities.convertToHadoopType(typeMap.get("value")); 1638 ChainMapper.addMapper(job.getJob(), Mapper4Cross.class, 1639 StubUtilities.convertToHadoopType(typeMap.get("key")), 1640 StubUtilities.convertToHadoopType(typeMap.get("value")), 1641 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"), 1642 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"), conf); 1643 1644 } 1645 if (actorList != null) { 1646 for (int i = actorList.size()-1; i >= 0; i--) { 1647 actorUtil = actorList.get(i); 1648 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, actorUtil.getName()); 1649 if(!_sameJVM) { 1650 conf.set(Utilities.CONFIGURATION_KEPLER_MODEL, actorUtil.getXml()); 1651 } 1652 java.util.Map<String, Type> typeMap2 = actorUtil.getTypeMap(); 1653 ChainMapper.addMapper(job.getJob(), 1654 MapReduce4Kepler.Mapper4Kepler.class, 1655 StubUtilities.convertToHadoopType(typeMap2.get("key")), 1656 StubUtilities.convertToHadoopType(typeMap2.get("value")), 1657 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap2.get("keysvalues"), "key"), 1658 StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap2.get("keysvalues"), "value"), conf); 1659 } 1660 } 1661 if (actor instanceof Cross) { 1662// System.out.println("Working on actor: " + actor.getFullName()); 1663 //set reducer of the job based on the output data type of the actor. 1664 //TODO: is this identical reducer really necessary? if we do not have this reducer, the output could be multiple part-m-*** files, reducer will merge them into one. 1665 final Class keyClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "key"); 1666 final Class valueClass = StubUtilities.convertRecordArrayToHadoopType((ArrayType)typeMap.get("keysvalues"), "value"); 1667 ChainReducer.setReducer(job.getJob(), 1668 Reducer.class, 1669 keyClass, valueClass, 1670 keyClass, valueClass, conf); 1671// hasReducer = true; 1672 } 1673 setReduceNumber(actor, job.getJob()); 1674 if (tmpDir != null) { 1675 job.getJob().setOutputFormatClass(SequenceFileOutputFormat.class); 1676 FileOutputFormat.setOutputPath(job.getJob(), new Path(tmpDir)); 1677 } 1678 if (actor instanceof Match || actor instanceof CoGroup) { 1679 if (!(_getUpStreamActorList(actor).get(0) instanceof DDPDataSource)) { 1680 //there are non DDPDataSource as upstream actors, so a new job should be created. 1681 Job newJob = new Job(this.conf); 1682 newJob.setJobName("taggingJob"); 1683 _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration()); 1684 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1685 1686 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1687 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1688 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1689 1690 //set tag for the actor connected with 'in' port. 1691 job.getJob().getConfiguration().set(Utilities.CONFIGURATION_KEPLER_TAG, tmpDir); 1692 1693 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1694 FileOutputFormat.setOutputPath(newJob, new Path(tmpDir)); 1695 1696 ControlledJob controlJob = new ControlledJob(newJob, null); 1697 job.addDependingJob(controlJob); 1698 jobList.add(controlJob); 1699 jobList = _getHadoopJobs(jobList, controlJob, 1700 _getUpStreamActorList(actor).get(0), null, false, true, tmpDir, false); 1701 } else { 1702 jobList = _getHadoopJobs(jobList, job, 1703 _getUpStreamActorList(actor).get(0), null, false, true, null, true); 1704 } 1705 1706 if (!(_getUpStreamActorList(actor).get(1) instanceof DDPDataSource)) { 1707 //there are non DDPDataSource as upstream actors, so a new job should be created. 1708 Job newJob = new Job(this.conf); 1709 _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration()); 1710 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1711 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1712 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1713 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1714 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1715 FileOutputFormat.setOutputPath(newJob, new Path(tmpDir)); 1716 ControlledJob controlJob = new ControlledJob(newJob, null); 1717 job.addDependingJob(controlJob); 1718 jobList.add(controlJob); 1719 return _getHadoopJobs(jobList, controlJob, 1720 _getUpStreamActorList(actor).get(1), null, false, true, tmpDir, false); 1721 } else { 1722 return _getHadoopJobs(jobList, job, 1723 _getUpStreamActorList(actor).get(1), null, false, true, null, false); 1724 } 1725 1726 } else { //actor is cross 1727 // check 1728 job.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1729 tmpDir = _getARadomHDFSDir(_tmpDir, false); 1730 FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1731 //create new job for the actor whose output is connected the 'in' port of this cross actor. 1732 //and connect new job's output to the input of the job for the cross actor. 1733 Job newJob = new Job(this.conf); 1734 newJob.setJobName(actor.getName() + "_in"); 1735 _addActorParamsToJob((DualInputPatternActor)actor, newJob.getConfiguration()); 1736 newJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1737// final String tmpOutputDir = _getARadomHDFSDir(_tmpDir, false); 1738 FileOutputFormat.setOutputPath(newJob, new Path(tmpDir)); 1739 1740 ControlledJob controlJob = new ControlledJob(newJob, null); 1741 job.addDependingJob(controlJob); 1742 jobList.add(controlJob); 1743 1744 1745// if (!(_getUpStreamActorList(actor).get(0) instanceof DDPDataSource)) { 1746// newJob.setInputFormatClass(SequenceFileInputFormat.class); 1747// tmpDir = _getARadomHDFSDir(_tmpDir, false); 1748// FileInputFormat.addInputPath(newJob, new Path(tmpDir)); 1749// jobList = _getHadoopJobs(jobList, controlJob, 1750// _getUpStreamActorList(actor).get(0), null, hasReducer, true, tmpDir); 1751// } else 1752 jobList = _getHadoopJobs(jobList, controlJob, 1753 _getUpStreamActorList(actor).get(0), null, false, hasSpecialMapper, null, false); 1754 1755 //_getUpStreamActorList(actor).get(1) is connected with in1 port, so this mapper's datatype should be based on key2 and value2 1756 //the output of _getUpStreamActorList(actor).get(1) will be put to hadoop HDFS at one input for cross sub-workflow. 1757 final Class keyClass = StubUtilities.convertToHadoopType(typeMap.get("key2")); 1758 final Class valueClass = StubUtilities.convertToHadoopType(typeMap.get("value2")); 1759 conf.set(Utilities.CONFIGURATION_KEPLER_DISTRIBUTED_CACHE_PATH, tmpCacheDir); 1760 conf.set(Utilities.CONFIGURATION_KEPLER_JOB_KEY_CLASS, keyClass.getName()); 1761 conf.set(Utilities.CONFIGURATION_KEPLER_JOB_VALUE_CLASS, valueClass.getName()); 1762 1763 Job dataTransJob = new Job(this.conf); 1764 dataTransJob.setJobName("dataTransform"); 1765 _addActorParamsToJob((DualInputPatternActor)actor, dataTransJob.getConfiguration()); 1766 //the output of this job is not really useful. 1767 dataTransJob.setOutputFormatClass(SequenceFileOutputFormat.class); 1768 final String tmpOutDir4DataTrans = _getARadomHDFSDir(_tmpDir, false); 1769 FileOutputFormat.setOutputPath(dataTransJob, new Path(tmpOutDir4DataTrans)); 1770 final String tmpInDir4DataTrans = _getARadomHDFSDir(_tmpDir, false); 1771 FileInputFormat.addInputPath(dataTransJob, new Path(tmpInDir4DataTrans)); 1772 dataTransJob.setInputFormatClass(SequenceFileInputFormat.class); 1773 1774 ControlledJob controlJob4DataTrans = new ControlledJob(dataTransJob, null); 1775 controlJob.addDependingJob(controlJob4DataTrans); 1776 jobList.add(controlJob4DataTrans); 1777 1778 ChainMapper.addMapper(dataTransJob, Mapper4DataTransform.class, 1779 keyClass, valueClass, 1780 keyClass, valueClass, conf); 1781 1782 Job newJob2 = new Job(this.conf); 1783 newJob2.setJobName("prepareDataTransform"); 1784 _addActorParamsToJob((DualInputPatternActor)actor, newJob2.getConfiguration()); 1785 //the output of this job will be consumed by job dataTransJob. 1786 newJob2.setOutputFormatClass(SequenceFileOutputFormat.class); 1787 FileOutputFormat.setOutputPath(newJob2, new Path(tmpInDir4DataTrans)); 1788 1789 ControlledJob controlJob2 = new ControlledJob(newJob2, null); 1790 controlJob4DataTrans.addDependingJob(controlJob2); 1791 jobList.add(controlJob2); 1792 1793// if (!(_getUpStreamActorList(actor).get(1) instanceof DDPDataSource)) { 1794// controlJob2.getJob().setInputFormatClass(SequenceFileInputFormat.class); 1795// tmpDir = _getARadomHDFSDir(_tmpDir, false); 1796// FileInputFormat.addInputPath(job.getJob(), new Path(tmpDir)); 1797// return _getHadoopJobs(jobList, controlJob2, 1798// _getUpStreamActorList(actor).get(1), null, hasReducer, true, tmpDir); 1799// } 1800 return _getHadoopJobs(jobList, controlJob2, 1801 _getUpStreamActorList(actor).get(1), null, false, hasSpecialMapper, null, false); 1802 } 1803 } 1804 1805 } 1806 } 1807 1808 return null; 1809 1810 } 1811 1812 /** Get the list of upstream actors from actor. */ 1813 private List<DDPPatternActor> _getUpStreamActorList(DDPPatternActor actor) 1814 throws IllegalActionException { 1815 1816 List<DDPPatternActor> upStreamActorList = null; 1817 1818 // see how many inputs are required 1819 int numRequiredInputs = -1; 1820 if (actor instanceof DualInputPatternActor) { 1821 numRequiredInputs = 2; 1822 } else if (actor instanceof SingleInputPatternActor 1823 || actor instanceof DDPDataSink) { 1824 numRequiredInputs = 1; 1825 } else if (actor instanceof DDPDataSource) { 1826 numRequiredInputs = 0; 1827 } else { 1828 throw new IllegalActionException(_director, 1829 "Unsupported actor for Hadoop Director: " 1830 + actor.getClass()); 1831 } 1832 1833 // see if there's at least one input 1834 if (numRequiredInputs > 0) { 1835 upStreamActorList = new LinkedList<DDPPatternActor>(); 1836 1837 IOPort port = (IOPort) actor.getPort("in"); 1838 // get the connected actor 1839 List<?> outputPorts = port.sourcePortList(); 1840 if (outputPorts.isEmpty()) { 1841 throw new IllegalActionException(_director, 1842 "DDPPatternActor input port " + port.getName() 1843 + " must be connected."); 1844 } else if (outputPorts.size() > 1) { 1845 throw new IllegalActionException(_director, 1846 "DDPPatternActor input port " + port.getName() 1847 + " may only be connected to one port."); 1848 } 1849 1850 IOPort outputPort1 = (IOPort) outputPorts.get(0); 1851 NamedObj outputNamedObj = outputPort1.getContainer(); 1852 1853 // FIXME 1854 if (!(outputNamedObj instanceof DDPPatternActor)) { 1855 throw new IllegalActionException(_director, "Actor " 1856 + actor.getFullName() 1857 + " is connected to a non-DDPPatternActor: " 1858 + outputNamedObj.getFullName()); 1859 } 1860 1861 upStreamActorList.add((DDPPatternActor) outputNamedObj); 1862 1863 if (numRequiredInputs > 1) { 1864 port = (IOPort) actor.getPort("in2"); 1865 // get the connected actor 1866 outputPorts = port.sourcePortList(); 1867 if (outputPorts.isEmpty()) { 1868 throw new IllegalActionException(_director, 1869 "DDPPatternActor input port " + port.getName() 1870 + " must be connected."); 1871 } else if (outputPorts.size() > 1) { 1872 throw new IllegalActionException(_director, 1873 "DDPPatternActor input port " + port.getName() 1874 + " may only be connected to one port."); 1875 } 1876 1877 outputPort1 = (IOPort) outputPorts.get(0); 1878 outputNamedObj = outputPort1.getContainer(); 1879 1880 // FIXME 1881 if (!(outputNamedObj instanceof DDPPatternActor)) { 1882 throw new IllegalActionException(_director, "Actor " 1883 + actor.getFullName() 1884 + " is connected to a non-DDPPatternActor: " 1885 + outputNamedObj.getFullName()); 1886 } 1887 1888 upStreamActorList.add((DDPPatternActor) outputNamedObj); 1889 } 1890 } 1891 1892 return upStreamActorList; 1893 } 1894 1895 /** 1896 * Get the Input/Output Format class name for a source or sink. 1897 * 1898 * @param actor 1899 * the Pactor 1900 * @param parameterName 1901 * the name of the parameter containing the class name. 1902 * @param input 1903 * if true, get a source contract. if false, get a sink contract. 1904 */ 1905 /* 1906 private String _getFormatClassName(AtomicPatternActor actor, 1907 String parameterName, boolean input) throws IllegalActionException { 1908 final Parameter parameter = (Parameter) actor 1909 .getAttribute(parameterName); 1910 if (parameter == null) { 1911 throw new IllegalActionException(actor, "Missing parameter " 1912 + parameterName + "."); 1913 } 1914 1915 // make sure it's not empty. 1916 final String formatName = ((StringToken) parameter.getToken()) 1917 .stringValue(); 1918 if (formatName.trim().isEmpty()) { 1919 throw new IllegalActionException(actor, 1920 "Missing value for parameter " + parameterName + "."); 1921 } 1922 1923 // try to find the format name in the config properties 1924 1925 String formatType; 1926 if (input) { 1927 formatType = "InputFormats.Format"; 1928 } else { 1929 formatType = "OutputFormats.Format"; 1930 } 1931 1932 final List<ConfigurationProperty> formatList = _configProp 1933 .getProperties(formatType); 1934 if (formatList == null || formatList.isEmpty()) { 1935 throw new IllegalActionException(_director, 1936 "No formats found in configuration.xml for type " 1937 + formatType); 1938 } 1939 1940 java.util.Map<String, String> parametersMap = new HashMap<String, String>(); 1941 String className = null; 1942 1943 for (ConfigurationProperty format : formatList) { 1944 if(format.getProperty("Name").getValue().equalsIgnoreCase(formatName) && 1945 format.getProperty("ImplementationClass").getProperty("Hadoop") != null){ 1946 className = format.getProperty("ImplementationClass").getProperty("Hadoop").getValue(); 1947 1948 // make sure class is specified for hadoop 1949 if (className == null) { 1950 throw new IllegalActionException(actor, "Format " 1951 + formatName + " does not have a class " 1952 + " for Hadoop."); 1953 } 1954 1955 ConfigurationProperty parameterProps = format 1956 .getProperty("Parameters"); 1957 if (parameterProps != null) { 1958 parametersMap = ConfigurationUtilities 1959 .getPairsMap(parameterProps); 1960 } 1961 1962 break; 1963 } 1964 } 1965 1966 if (className != null) 1967 return className; 1968 //if the classname is null, it could be because the format name is the class name. return formatname. 1969 else 1970 return formatName; 1971 1972 } 1973 */ 1974 1975 /** 1976 * Get the value of a parameter for an actor. 1977 * 1978 * @param actor 1979 * the Pactor 1980 * @param parameterName 1981 * the name of the parameter containing the class name. 1982 */ 1983 private String _getParaValue(AtomicPatternActor actor, String parameterName) 1984 throws IllegalActionException { 1985 final Parameter parameter = (Parameter) actor 1986 .getAttribute(parameterName); 1987 if (parameter == null) { 1988 throw new IllegalActionException(actor, "Missing parameter " 1989 + parameterName + "."); 1990 } 1991 1992 // make sure it's not empty. 1993 final String value = ((StringToken) parameter.getToken()).stringValue(); 1994 if (value.trim().isEmpty()) { 1995 throw new IllegalActionException(actor, 1996 "Missing value for parameter " + parameterName + "."); 1997 } 1998 1999 return value; 2000 2001 } 2002 2003 private void _exportSubWorkflow(SingleInputPatternActor actor) 2004 throws IllegalActionException, IOException { 2005 if (_jobDir == null) { 2006 _createJobDirectory(); 2007 } 2008 final String modelPath = _jobDir + actor.getName() + ".xml"; 2009 FileWriter writer = null; 2010 try { 2011 writer = new FileWriter(modelPath); 2012 actor.exportMoML(writer); 2013 } catch (IOException e) { 2014 throw new IllegalActionException(_director, e, "Error writing model to " 2015 + modelPath); 2016 } finally { 2017 if (writer != null) 2018 writer.close(); 2019 } 2020 } 2021 2022 // get a random local dir to store output during Map-Reduce computing. 2023 private String _getARadomLocalDir() throws IllegalActionException { 2024 String tmpDir = System.getProperty("java.io.tmpdir"); 2025 Random ran = new Random(); 2026 Long ranLong = null; 2027 String ranStr = null; 2028 boolean res = false; 2029 int index = 0; 2030 do { 2031 ranLong = ran.nextLong(); 2032 if (ranLong < 0) 2033 ranLong = 0 - ranLong; 2034 ranStr = tmpDir + System.getProperty("file.separator") + ranLong.toString(); 2035 _log.info("the random directory to be created in _getARadomLocalDir(): " + ranStr); 2036 res = new File(ranStr).mkdir(); 2037 index++; 2038 } while (res != true && index < 10); 2039 if (index == 10) { 2040 throw new IllegalActionException( 2041 _director, 2042 "the random directory creation in folder " + tmpDir + " failed for 10 times, please check local file system."); 2043 } 2044 return ranStr; 2045 } 2046 2047 // get a random HDFS dir to store output during Map-Reduce computing. The dir will not be actually created since it will be done in Hadoop job. 2048 private String _getARadomHDFSDir(String parentPath, boolean mkDir) throws IllegalActionException, IOException { 2049 _log.info("enter into _getARadomHDFSDir()"); 2050 FsShell shell = new FsShell(conf); 2051 Random ran = new Random(); 2052 Long ranLong = null; 2053 String ranStr = null; 2054 String[] arg4Mkdir = { "-ls", "" }; 2055 int stopSignal = 1; 2056 if (mkDir) { 2057 arg4Mkdir[0] = "-mkdir"; 2058 stopSignal = 0; 2059 } 2060 // String[] arg4Mkdir = null; 2061 int res = -1; 2062 int index = 0; 2063 try { 2064 do { 2065 ranLong = ran.nextLong(); 2066 if (ranLong < 0) 2067 ranLong = 0 - ranLong; 2068 ranStr = ranLong.toString(); 2069 if (parentPath != null) 2070 ranStr = parentPath + "/" + ranStr; 2071 else // added for hadoop 2.0, 2072 ranStr = "/" + ranStr; 2073 arg4Mkdir[1] = ranStr; 2074 _log.info("the args in _getARadomHDFSDir(): " + arg4Mkdir[0] + ", " 2075 + arg4Mkdir[1]); 2076 res = shell.run(arg4Mkdir); 2077 index++; 2078 // res = ToolRunner.run(conf, shell, arg4Mkdir); 2079 final String message = Arrays.toString(arg4Mkdir).replace(",", ""); 2080 _log.info("the exit code of 'hadoop fs " + message.substring(1, message.length()-1) + " ' is: " + res); 2081 } while (res != stopSignal && index < 10); 2082 if (index == 10) { 2083 final String message = Arrays.toString(arg4Mkdir).replace(",", ""); 2084 throw new IllegalActionException( 2085 _director, 2086 "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed for 10 times, please check whether hadoop is correctly setup."); 2087 } 2088 2089 if(mkDir) { //test staging in files 2090 _log.info("staging tmp file to check whether DataNode is working "); 2091 _stageIn(ranStr, _configDirStr + File.separator + "slaves"); 2092 } 2093 2094 return ranStr; 2095 } catch (Exception e) { 2096 2097 e.printStackTrace(); 2098 final String message = Arrays.toString(arg4Mkdir).replace(",", ""); 2099 throw new IllegalActionException( 2100 _director, e, 2101 "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed, please check whether hadoop is correctly setup."); 2102 } finally { 2103 shell.close(); 2104 _log.debug("enter into finally of _getARadomHDFSDir() with ranStr:" + ranStr); 2105 } 2106 2107 } 2108 2109 private void _verifyParaSetting(String path, DDPPatternActor actor) throws IllegalActionException, IOException { 2110 if (path.substring(0, 4).equalsIgnoreCase("hdfs") && _autoHDFSStage) 2111 throw new IllegalActionException( 2112 _director, 2113 "Since the parameter autoHDFSStage of director " + _director.getName() + " is true, actor " + actor.getFullName() + "'s path cannot be HDFS url."); 2114 2115 } 2116 2117 2118 private void _removeOutputDir(String outputPathStr, DDPPatternActor actor) throws IllegalActionException, IOException { 2119 boolean deleteDecision = true; 2120 if (!outputPathStr.startsWith("hdfs")) { 2121 String[] filesInOutputPath = new File(outputPathStr).list(); 2122 for(int i = 0; i < filesInOutputPath.length; i++){ 2123 //make the behavior the same with Spark 2124 if (!filesInOutputPath[i].startsWith("part-") && !filesInOutputPath[i].startsWith(".part-") 2125 && !filesInOutputPath[i].equalsIgnoreCase("_SUCCESS") 2126 && !filesInOutputPath[i].equalsIgnoreCase("._SUCCESS.crc")) { 2127 deleteDecision = false; 2128 break; 2129 } 2130 2131 } 2132 if(deleteDecision) 2133 FileUtils.deleteDirectory(new File(outputPathStr)); //TODO: make this delete more cautious and consistent with HDFS. 2134 else 2135 throw new IllegalActionException( 2136 _director, 2137 "the path " + outputPathStr + " in actor " 2138 + actor.getName() + " cannot be deleted because it has folders or other files. Please check file system." ); 2139 } else { 2140 Path outPath = new Path(outputPathStr); 2141 if (fs == null) 2142 fs = FileSystem.get(conf); 2143 if (fs.exists(outPath)) 2144 fs.delete(outPath, true); 2145 } 2146 2147 } 2148 2149 private void _removeOutput(String strPath, boolean tmpDir) throws IllegalActionException, IOException { 2150 2151 if (fs == null) 2152 fs = FileSystem.get(conf); 2153 Path hadoopPath = new Path(strPath); 2154 2155 //if the output file doesn't exist, return directly. 2156 if (!fs.exists(hadoopPath)) 2157 return; 2158 2159 boolean exitCode = false; 2160 2161 //if (fs.isFile(hadoopPath)) 2162 System.out.println("tmpDir:" + tmpDir); 2163 exitCode = fs.delete(hadoopPath, tmpDir); 2164 2165 if (!exitCode) 2166 throw new IllegalActionException( 2167 _director, 2168 "Failed to remove output " + strPath + ".\n Please check console for detailed information and whether hadoop is correctly setup."); 2169 2170 /* 2171 fs.listFiles(new Path(hdfsPath), false); 2172 2173 //remove intermediate data in HDFS 2174 FsShell shell = new FsShell(conf); 2175 int res = -1; 2176 String[] argv1 = { "-rm", "-r", hdfsPathStr}; 2177 try { 2178 res = shell.run(argv1); 2179 if (res == -1) { 2180 final String message = Arrays.toString(argv1).replace(",", ""); 2181 throw new IllegalActionException( 2182 _director, 2183 "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed, please check whether hadoop is correctly setup."); 2184 } 2185 2186 } catch (Exception e) { 2187 2188 e.printStackTrace(); 2189 final String message = Arrays.toString(argv1).replace(",", ""); 2190 throw new IllegalActionException( 2191 _director, e, 2192 "command 'hadoop fs " + message.substring(1, message.length()-1) + "' failed, please check whether hadoop is correctly setup."); 2193 2194 } finally { 2195 shell.close(); 2196 } */ 2197 } 2198 2199 //set reduce number based on actor parallel degree. It is more useful for downstream actors' parallelization. 2200 private void setReduceNumber(DDPPatternActor actor, Job job) throws IllegalActionException{ 2201 int numInstances = actor.getDegreeOfParallelism(); 2202 if(numInstances <= 0) { 2203 numInstances = _degreeOfParallelism; 2204 } 2205 else if(_sameJVM && numInstances > 1) { 2206 System.err.println("WARNING: degree of parallelism for " + 2207 actor.getName() + " is " + numInstances + 2208 ", but Hadoop only uses 1 thread in sameJVM mode."); 2209 } 2210 2211 job.setNumReduceTasks(numInstances); 2212 } 2213 2214 private void _addActorParamsToJob(DDPPatternActor actor, Configuration conf) 2215 throws IllegalActionException { 2216 // add any actor parameters to the job configuration 2217 final java.util.Map<String,String> actorParameters = actor.getParameters(); 2218 final java.util.Map<String,String> paraNames = actor.getParaImplNames(_engineName); 2219 for(java.util.Map.Entry<String, String> entry : actorParameters.entrySet()) { 2220 String keplerParaName = entry.getKey(); 2221 if (paraNames.get(keplerParaName) != null) 2222 conf.set(paraNames.get(keplerParaName), entry.getValue()); 2223 else 2224 conf.set(Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::" + keplerParaName, entry.getValue()); 2225 } 2226 2227 //set data parallel degree 2228 int numInstances = actor.getDegreeOfParallelism(); 2229 if(numInstances <= 0) { 2230 numInstances = _degreeOfParallelism; 2231 } 2232 else if(_sameJVM && numInstances > 1) { 2233 System.err.println("WARNING: degree of parallelism for " + 2234 actor.getName() + " is " + numInstances + 2235 ", but Hadoop only uses 1 thread in sameJVM mode."); 2236 } 2237 2238 conf.set(Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL, String.valueOf(numInstances)); 2239 2240 // the following parameters are only useful for SingleInputPatternActor instances. 2241// if (actor instanceof SingleInputPatternActor) { 2242 //set print execution info parameter 2243 conf.setBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, actor.getPrintExeInfo()); 2244 //set display redirect path 2245 String directDir = actor.getDisplayRedirectDir(); 2246 boolean dirFromActor = true; 2247 if(directDir.isEmpty()) { 2248 dirFromActor = false; 2249 directDir = _displayRedirectDir; 2250 } 2251 if(!directDir.isEmpty()) { 2252 final File file = new File (directDir); 2253 if (!file.exists()) { 2254 boolean result = file.mkdirs(); 2255 if (result==false) 2256 throw new IllegalActionException(dirFromActor ? actor : _director, "Cannot create directory based on Parameter displayRedirectDir's value '" + _displayRedirectDir + "'. Please check your file system."); 2257 } 2258 if (!file.isDirectory() || !file.canWrite()) 2259 throw new IllegalActionException(dirFromActor ? actor : _director, "Parameter displayRedirectDir's value '" + _displayRedirectDir + "' must be a directory and writable."); 2260 conf.set(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, directDir); 2261 } 2262// } 2263 2264 // set the location of the kepler installation directory. 2265 // NOTE: this is done so that the stub can initialize kepler and set 2266 // the java properties for each module's workflow directory, e.g.: 2267 // property("hadoop.workflowdir") 2268 // if the modules directory does not exist on the stub, e.g., the file 2269 // system is not shared, then initialization is not done and the stub 2270 // workflow cannot use the module workflow directory properties. 2271 conf.set(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, 2272 ProjectLocator.getKeplerModulesDir().getAbsolutePath()); 2273 } 2274 2275 /* 2276 private URLClassLoader _loadActorJars(DDPPatternActor actor) 2277 throws IllegalActionException, MalformedURLException { 2278 final String jarsStr = actor.getJars(); 2279 if(!jarsStr.isEmpty()) { 2280 ArrayList<URL> jarList = new ArrayList<URL>(); 2281 final String[] jars = jarsStr.split(","); 2282 for(String jar : jars) { 2283 final File jarFile = new File(jar); 2284 if(!jarFile.exists() || !jarFile.canRead()) { 2285 throw new IllegalActionException(actor, 2286 "Jar does not exist or cannot be read: " + jarFile.getAbsolutePath()); 2287 } 2288 System.out.println("Adding jar for " + actor.getFullName() + ": " + 2289 jarFile.getAbsolutePath()); 2290 jarList.add(new File(jar).toURI().toURL()); 2291 } 2292 URL[] jarArray = jarList.toArray(new URL[jarList.size()]); 2293 if (jarArray != null && jarArray.length > 0) { 2294 return new URLClassLoader(jarArray, Thread.currentThread() 2295 .getContextClassLoader()); 2296 } 2297 } 2298 //by default, return the class loader of this director; 2299 return (URLClassLoader)_classLoader; 2300 } */ 2301 2302 2303 private void _addDirectorParamsToJob(Configuration conf) 2304 throws IllegalActionException { 2305 for(java.util.Map.Entry<String, String> entry : _jobArgumentsMap.entrySet()) { 2306 conf.set(entry.getKey(), entry.getValue()); 2307 } 2308 } 2309 2310 // set PartitonerClass of Map actor to Hadoop job. 2311 private void _setPartitonerClass(Job job, Actor actor) throws ClassNotFoundException, IllegalActionException{ 2312 final String partitionerClassName = ((Map)actor).partitionerClass.stringValue(); 2313 if (!partitionerClassName.isEmpty()) { 2314 final Class partitionerClass = _classLoader.loadClass(partitionerClassName); 2315 if (!Partitioner.class.isAssignableFrom(partitionerClass)) { 2316 throw new IllegalActionException(actor, 2317 "Partitioner class " + 2318 partitionerClass + " must be a subclass of " + 2319 Partitioner.class.getName()); 2320 } 2321 job.setPartitionerClass(partitionerClass); 2322 } 2323 } 2324 2325 // set jars of Kepler to Hadoop job. 2326 private void _setJars(Configuration conf, URL[] jarArray) throws IllegalActionException { 2327 if (jarArray != null && jarArray.length > 0) { 2328 StringBuffer jarStringBuffer = new StringBuffer(); 2329 for (URL jarURL : jarArray) { 2330 jarStringBuffer.append(jarURL); 2331 jarStringBuffer.append(","); 2332 } 2333 String jarString = jarStringBuffer.toString().substring(0, 2334 jarStringBuffer.length() - 1); 2335 2336 _log.info("kepler jars to be added to hadoop: " + jarString); 2337 //tmpjars is the parameter in Hadoop for third-party jars. 2338 conf.set("tmpjars", jarString); 2339 2340 conf.setClassLoader(new URLClassLoader(jarArray, conf 2341 .getClassLoader())); 2342 } 2343 } 2344 2345 // stage data in from local file system to HDFS 2346 private String _stageIn(String parentPath, String localInput) throws IllegalActionException, IOException { 2347 _log.debug("enter into stageIn. localInput is:" + localInput); 2348 File localInputFile = new File(localInput); 2349 String localInputStr = localInputFile.getName(); 2350 FsShell shell = new FsShell(conf); 2351 Random ran = new Random(); 2352 Long ranLong = null; 2353 String ranStr = null; 2354 //int ranInt; 2355 // String[] arg4Mkdir = null; 2356 int res = -1; 2357 int index = 0; 2358 String[] arg4Put = { "-put", localInput, ""}; 2359 String[] arg4Mkdir = { "-mkdir", "" }; 2360 try { 2361 do { 2362 ranLong = ran.nextLong(); 2363 if (ranLong < 0) 2364 ranLong = 0 - ranLong; 2365 ranStr = String.valueOf(ranLong); 2366 if (parentPath != null) 2367 ranStr = parentPath + "/" + ranStr; 2368 else // added for hadoop 2.0, 2369 ranStr = "/" + ranStr; 2370 arg4Mkdir[1] = ranStr; 2371 _log.debug("the arg4Mkdir: " + arg4Mkdir[0] + ", " 2372 + arg4Mkdir[1]); 2373 res = shell.run(arg4Mkdir); 2374 // res = ToolRunner.run(conf, shell, arg4Mkdir); 2375 _log.debug("the exit code of 'hadoop fs -mkdir " + ranStr + " ' is: " 2376 + res); 2377 index++; 2378 } while (res != 0 && index < 10); 2379 if (index == 10) 2380 throw new IllegalActionException( 2381 _director, 2382 "command 'hadoop fs -mkdir' failed for 10 times, please check whether hadoop is correctly setup."); 2383 arg4Put[2] = ranStr; 2384 _log.debug("the arg4Put: " + arg4Put[0] + ", " + arg4Put[1] + ", " 2385 + arg4Put[2]); 2386 // res = ToolRunner.run(shell, arg4Put); 2387 res = shell.run(arg4Put); 2388 final String message = Arrays.toString(arg4Put).replace(",", ""); 2389 _log.debug("the exit code of 'hadoop fs " + message.substring(1, message.length()-1) + " ' is: " + res); 2390 if (res == -1) { 2391 throw new IllegalActionException( 2392 _director, 2393 "command 'hadoop fs " 2394 + message.substring(1, message.length()-1) 2395 + "' was not successully executed, please check whether hadoop is correctly setup."); 2396 } 2397 return ranStr + "/" + localInputStr; 2398 } catch (Exception e) { 2399 final String message = Arrays.toString(arg4Put).replace(",", ""); 2400 throw new IllegalActionException( 2401 _director, 2402 "command 'hadoop fs " 2403 + message.substring(1, message.length()-1) 2404 + "' was not successully executed, please check whether hadoop is correctly setup."); 2405 } finally { 2406 shell.close(); 2407 _log.debug("enter into finally with ranStr:" + ranStr); 2408 } 2409 } 2410 2411 // stage data out from HDFS to local file system 2412 private int _stageOut(String hdfsOutput, String localOuput) 2413 throws IllegalActionException, IOException { 2414 _log.debug("enter into stageOut"); 2415 if (fs == null) 2416 fs = FileSystem.get(conf); 2417 Path hdfsOutputPath = new Path(hdfsOutput); 2418 if (fs.exists(hdfsOutputPath)) { 2419 fs.copyToLocalFile(true, hdfsOutputPath, new Path(localOuput)); 2420 } 2421 return 0; //output file doesn't exsit, return 0. 2422 2423 } 2424 2425 /** Transfer token outputs from HDFS to DDPDataSink actors. */ 2426 private void _transferTokenOutputsFromHDFS() throws IllegalActionException { 2427 2428 // transfer the output for each sink actor using TokenOutputFormat 2429 for(java.util.Map.Entry<Path, DDPDataSink> entry : _tokenOutputFileMap.entrySet()) { 2430 2431 // determine the key and value types 2432 2433 final DDPDataSink sinkActor = entry.getValue(); 2434 Type inType = sinkActor.in.getType(); 2435 if(!(inType instanceof ArrayType)) { 2436 throw new IllegalActionException(_director, 2437 "Expected array input type for " + sinkActor.getFullName() + 2438 " ; found: " + inType); 2439 } 2440 Type elementType = ((ArrayType)inType).getElementType(); 2441 if(!(elementType instanceof RecordType)) { 2442 throw new IllegalActionException(_director, 2443 "Expected array of records input type for " + sinkActor.getFullName() + 2444 " ; found: array of " + elementType); 2445 } 2446 2447 // TODO check for null 2448 final Type keyType = ((RecordType)elementType).get("key"); 2449 // TODO check for null 2450 final Type valueType = ((RecordType)elementType).get("value"); 2451 2452 // transfer the file from HDFS to the local file system. 2453 final Path tokenPath = entry.getKey(); 2454 2455 try { 2456 2457 File tmpFile = File.createTempFile("tokenOutput", ".txt"); 2458 if(!tmpFile.delete()) { 2459 System.err.println("WARNING: could not delete " + tmpFile); 2460 } 2461 FileUtil.copyMerge(tokenPath.getFileSystem(conf), 2462 tokenPath, 2463 FileSystem.getLocal(conf), 2464 new Path(tmpFile.getAbsolutePath()), 2465 true, 2466 conf, 2467 "" 2468 ); 2469 2470 // parse the file and create a list of record tokens 2471 FileReader fileReader = null; 2472 BufferedReader reader = null; 2473 try { 2474 fileReader = new FileReader(tmpFile); 2475 reader = new BufferedReader(fileReader); 2476 List<Token> tokenList = new ArrayList<Token>(); 2477 String line = null; 2478 while((line = reader.readLine()) != null) { 2479 //System.out.println("token output: " + line); 2480 String[] parts = line.split("\t"); 2481 if(parts.length != 2) { 2482 throw new IllegalActionException(_director, 2483 "Mal-formed output in token output file: " + line); 2484 } 2485 2486 // create the key and value tokens based on the types used by the 2487 // sink actor's input port. 2488 RecordToken token = new RecordToken(new String[] {"key", "value"}, 2489 new Token[] { 2490 Utilities.createTokenFromString(parts[0], keyType), 2491 Utilities.createTokenFromString(parts[1], valueType)}); 2492 tokenList.add(token); 2493 } 2494 DDPDataSink.addTokens(sinkActor.getFullName(), tokenList); 2495 } finally { 2496 if(fileReader != null) { 2497 fileReader.close(); 2498 } 2499 if(reader != null) { 2500 reader.close(); 2501 } 2502 if(tmpFile != null) { 2503 tmpFile.delete(); 2504 } 2505 // TODO clean up HDFS 2506 } 2507 } catch(IOException e) { 2508 throw new IllegalActionException(_director, e, 2509 "Error reading token output file in HDFS: " + tokenPath); 2510 } 2511 } 2512 } 2513 2514 /** 2515 * If true, the output directory will be deleted first. 2516 */ 2517 public Parameter overwriteOutput; 2518 2519 /** 2520 * If true, the temporary dir created on HDFS during its execution will be removed after workflow execution. 2521 */ 2522 public Parameter removeTmpHDFSDir; 2523 2524 /** 2525 * If true, input and output in local file system is automatically staged into or out from HDFS. 2526 */ 2527 public Parameter autoHDFSStage; 2528 2529 /** Hadoop configuration. */ 2530 private Configuration conf; 2531 2532 /** Configuration properties for ddp-common module. */ 2533 //private ConfigurationProperty _configProp; 2534 2535 /** 2536 * If true, the output directory will be deleted first. 2537 */ 2538 public boolean _overwriteOutput = false; 2539 2540 /** 2541 * If true, input and output in local file system is automatically staged into or out from HDFS. 2542 */ 2543 public boolean _autoHDFSStage = false; 2544 2545 /** 2546 * The temporary directory for each workflow execution. 2547 */ 2548 private String _tmpDir; 2549 2550 /** 2551 * If _stageOutDirMap is not empty, the data in HDFS will be staged out to local file system at the end of execution of the Hadoop director. 2552 * Each element of the map is composed of <tmpDirOnHDFS, localDir>. 2553 */ 2554 private java.util.Map<String, String> _stageOutDirMap = null; 2555 2556 /** 2557 * A list of configuration directories for each hadoop server that is 2558 * started. 2559 */ 2560 private final static List<String> _startedConfigDirs = Collections 2561 .synchronizedList(new LinkedList<String>()); 2562 2563 /** Logging. */ 2564 private final static Log _log = LogFactory.getLog(HadoopEngine.class); 2565 2566 /** The name of the hadoop engine. */ 2567 private final static String HADOOP_ENGINE_NAME = "Hadoop"; 2568 2569 /** The Hadoop job manager. */ 2570 private JobControl _jobControl; 2571 2572 /** The Hadoop file system */ 2573 private FileSystem fs = null; 2574 2575 /** Mapping of token output results in HDFS to their destination DDPDataSink actor. */ 2576 private java.util.Map<Path,DDPDataSink> _tokenOutputFileMap = new HashMap<Path,DDPDataSink>(); 2577}