001/* An engine than runs models in Spark. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 008 * '$Revision: 34532 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.spark.director; 031 032import java.io.File; 033import java.io.FileWriter; 034import java.io.IOException; 035import java.io.InputStream; 036import java.io.StringWriter; 037import java.net.InetAddress; 038import java.net.InetSocketAddress; 039import java.net.Socket; 040import java.net.URI; 041import java.net.UnknownHostException; 042import java.util.Collections; 043import java.util.HashSet; 044import java.util.List; 045import java.util.Set; 046 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 049import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 050import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 051import org.apache.spark.api.java.JavaSparkContext; 052import org.apache.spark.api.java.function.Function2; 053import org.apache.spark.api.java.function.PairFlatMapFunction; 054import org.kepler.build.project.ProjectLocator; 055import org.kepler.configuration.ConfigurationProperty; 056import org.kepler.ddp.Utilities; 057import org.kepler.ddp.actor.pattern.AtomicPatternActor; 058import org.kepler.ddp.actor.pattern.CoGroup; 059import org.kepler.ddp.actor.pattern.Cross; 060import org.kepler.ddp.actor.pattern.DDPDataSink; 061import org.kepler.ddp.actor.pattern.DDPDataSource; 062import org.kepler.ddp.actor.pattern.DDPPatternActor; 063import org.kepler.ddp.actor.pattern.Map; 064import org.kepler.ddp.actor.pattern.Match; 065import org.kepler.ddp.actor.pattern.Reduce; 066import org.kepler.ddp.actor.pattern.SingleInputPatternActor; 067import org.kepler.ddp.director.DDPDirector; 068import org.kepler.ddp.director.DDPEngine; 069import org.kepler.spark.actor.SparkConnection; 070import org.kepler.spark.operator.CoGroupOperator; 071import org.kepler.spark.operator.CrossOperator; 072import org.kepler.spark.operator.DataSink; 073import org.kepler.spark.operator.DataSource; 074import org.kepler.spark.operator.FileDataSink; 075import org.kepler.spark.operator.FileDataSource; 076import org.kepler.spark.operator.JoinOperator; 077import org.kepler.spark.operator.MapOperator; 078import org.kepler.spark.operator.NullSink; 079import org.kepler.spark.operator.Operator; 080import org.kepler.spark.operator.ReduceOperator; 081import org.kepler.spark.operator.TokenSink; 082import org.kepler.spark.operator.TokenSource; 083import org.kepler.spark.stub.KeplerCoGroupStub; 084import org.kepler.spark.stub.KeplerCrossStub; 085import org.kepler.spark.stub.KeplerMapStub; 086import org.kepler.spark.stub.KeplerMatchStub; 087import org.kepler.spark.stub.KeplerReduceStub; 088import org.kepler.spark.stub.ScriptEngineMapStub; 089import org.kepler.spark.stub.ScriptEngineReduceStub; 090 091import ptolemy.actor.IOPort; 092import ptolemy.data.ArrayToken; 093import ptolemy.data.IntToken; 094import ptolemy.data.StringToken; 095import ptolemy.data.Token; 096import ptolemy.data.expr.Parameter; 097import ptolemy.data.expr.StringParameter; 098import ptolemy.data.type.BaseType; 099import ptolemy.kernel.util.Attribute; 100import ptolemy.kernel.util.IllegalActionException; 101import ptolemy.kernel.util.NameDuplicationException; 102import ptolemy.kernel.util.NamedObj; 103import ptolemy.kernel.util.Workspace; 104 105/** An engine than runs workflows in Spark. This engine 106 * converts DDP pattern actors (Map, Reduce, Cross, CoGroup, and 107 * Match) and I/O actors (DDPDataSink and DDPDataSource) into a 108 * Spark job and runs it on the server. 109 * <p> 110 * <b>NOTE:</b> Only DDP pattern and I/O actors may be present in 111 * the workflow. Other actors must placed inside the composite 112 * pattern actors or in a different sub-workflow. 113 * </p> 114 * 115 * @author Daniel Crawl 116 * @version $Id: SparkEngine.java 34532 2016-10-17 19:14:15Z crawl $ 117 * 118 */ 119public class SparkEngine extends DDPEngine { 120 121 public SparkEngine(DDPDirector director) throws IllegalActionException, 122 NameDuplicationException { 123 super(director); 124 125 _engineName = SPARK_ENGINE_NAME; 126 127 // initialize this here since _addParameters() accesses it 128 // before the field can be initialized. 129 _numSameJVMWorkers = 8; 130 } 131 132 /** React to a parameter change. */ 133 @Override 134 public void attributeChanged(Attribute attribute) throws IllegalActionException { 135 136 if(attribute == masterHostAndPort) { 137 InetSocketAddress address = null; 138 String masterHostAndPortStr = ((StringToken)masterHostAndPort.getToken()).stringValue(); 139 if(!masterHostAndPortStr.trim().isEmpty()) { 140 // parse the string to check for errors. 141 address = SparkConnection.getMasterAndPortAddress(masterHostAndPortStr); 142 } 143 144 if(address != null && address.getHostString().equals("localhost")) { 145 try { 146 _masterHostAndPortStr = InetAddress.getLocalHost().getHostName() + 147 ":" + address.getPort(); 148 } catch (UnknownHostException e) { 149 throw new IllegalActionException(_director, e, "Unknown host."); 150 } 151 System.out.println("Using " + _masterHostAndPortStr + 152 " instead of 'localhost' since Spark has problems binding to 'localhost'."); 153 } else { 154 _masterHostAndPortStr = masterHostAndPortStr; 155 } 156 157 } else if(attribute == numSameJVMWorkers) { 158 Token token = numSameJVMWorkers.getToken(); 159 if(token == null) { 160 throw new IllegalActionException(_director, 161 "numSameJVMWorkers must be specified."); 162 } 163 _numSameJVMWorkers = ((IntToken)token).intValue(); 164 165 } else if(attribute == driverMemory) { 166 String driverMemoryStr = ((StringToken)driverMemory.getToken()).stringValue(); 167 if(driverMemoryStr.trim().isEmpty()) { 168 driverMemoryStr = SparkConnection.DEFAULT_DRIVER_MEMORY; 169 } 170 171 } else { 172 super.attributeChanged(attribute); 173 } 174 175 } 176 177 /** Clone the SparkEngine into the specified workspace. 178 * @param workspace The workspace for the cloned object. 179 * @exception CloneNotSupportedException Not thrown in this base class 180 * @return The new SparkEngine. 181 */ 182 @Override 183 public Object clone(Workspace workspace) throws CloneNotSupportedException { 184 SparkEngine newObject = (SparkEngine) super.clone(workspace); 185 newObject._context = null; 186 newObject._inputsAlreadyDone = new HashSet<Operator>(); 187 newObject._job = null; 188 return newObject; 189 } 190 191 @Override 192 public void preinitialize() throws IllegalActionException { 193 194 super.preinitialize(); 195 196 // get the degree of parallelism even though it may be done 197 // in the parent class. the parent class may set it to 1 198 // as the default. 199 _degreeOfParallelism = ((IntToken)_director.degreeOfParallelism.getToken()).intValue(); 200 201 // load the default config dir if necessary 202 _checkConfigDir(); 203 204 } 205 206 public static void shutdownServer() throws IllegalActionException { 207 208 // shut down any servers we started. 209 synchronized(_serverStartStopLock) { 210 for(String stopScriptStr : _stopScriptsForShutdown) { 211 212 System.out.println("Stopping Spark server by running " + 213 stopScriptStr); 214 215 ProcessBuilder builder = new ProcessBuilder(stopScriptStr); 216 217 // make sure JAVA_HOME is set 218 java.util.Map<String,String> env = builder.environment(); 219 if(env.get("JAVA_HOME") == null) { 220 env.put("JAVA_HOME", System.getProperty("java.home")); 221 } 222 223 try { 224 Process process = builder.start(); 225 process.waitFor(); 226 } catch (Exception e) { 227 throw new IllegalActionException("Unable to stop Spark: " + e.getMessage()); 228 } 229 } 230 } 231 } 232 233 /** Stop any running Spark jobs. */ 234 @Override 235 public void stop() throws IllegalActionException { 236 super.stop(); 237 238 synchronized(_jobLock) { 239 if(_job != null) { 240 _job.stop(); 241 } 242 } 243 } 244 245 /////////////////////////////////////////////////////////////////// 246 //// public fields ////// 247 248 public StringParameter masterHostAndPort; 249 250 public Parameter numSameJVMWorkers; 251 252 public StringParameter driverMemory; 253 254 /////////////////////////////////////////////////////////////////// 255 //// protected methods ////// 256 257 258 /** Check the existence of the Spark assembly jar. */ 259 @Override 260 protected boolean _checkFilesBeforeStartingServer() throws IllegalActionException { 261 262 if(!Utilities.isExecutingUnderResourceManager()) { 263 // look for the assembly jar. the assembly jar is not shipped with 264 // the spark module, so give the URL building it if not found. 265 if(!super._checkFilesBeforeStartingServer()) { 266 throw new IllegalActionException( 267 "Cannot start the Spark Master since the assembly jar does not exist. " + 268 "This jar is not shipped\n" + 269 "with Kepler and must be built; " + 270 "the instructions for building it can be found at:\n" + 271 "https://kepler-project.org/developers/interest-groups/distributed/configuring-hadoop-for-biokepler-or-ddp-suite"); 272 } 273 } 274 return true; 275 } 276 277 @Override 278 protected void _executeDDPJob() throws IllegalActionException { 279 280 // see what jars are necessary. call this before creating the job 281 // since a new classloader be created for external classes. 282 final List<URI> jarURIs = _getJarList(); 283 284 _getContext(); 285 286 try { 287 _sparkParallelism = _context.defaultParallelism(); 288 289 if(_degreeOfParallelism == DDPDirector.DEFAULT_INTTOKEN.intValue()) { 290 _degreeOfParallelism = _sparkParallelism; 291 } 292 293 if(_degreeOfParallelism > _sparkParallelism) { 294 System.err.println("WARNING: the default degree of parallelism for " + 295 _director.getName() + " is " + _degreeOfParallelism + 296 ", which is greater than Spark's parallelism." + 297 "(" + _sparkParallelism + ")."); 298 if(_sameJVM) { 299 System.err.println("Increase the number of local workers to achieve " + 300 "this degree of parallelism."); 301 } 302 } 303 304 System.out.println("Default Spark parallelism is " + 305 _degreeOfParallelism + "."); 306 307 for (URI jarURI : jarURIs) { 308 String jarStr = jarURI.toString(); 309 // Spark 2 throws an exception when more than one jar with 310 // the same name is added (even when in different directories). 311 // So we need to exclude the (empty) kepler-tasks.jar that is 312 // built for the kepler-tasks module. 313 if(!jarStr.endsWith("target/kepler-tasks.jar")) { 314 _context.addJar(jarStr); 315 } 316 } 317 318 synchronized(_jobLock) { 319 _job = _getModelPlan(); 320 321 //System.out.println("Starting Spark job."); 322 323 _job.start(); 324 } 325 326 try { 327 _job.waitForFinish(); 328 } catch (Exception e) { 329 throw new IllegalActionException(_director, e, "Error in Spark job."); 330 } 331 332 //System.out.println("Finished Spark job."); 333 334 // set the _job to null here instead of wrapup() to prevent 335 // stop() from calling _job.stop() when the context has been 336 // stopped. 337 synchronized(_jobLock) { 338 _job = null; 339 } 340 341 } finally { 342 if(_context != null) { 343 SparkConnection.releaseContext(); 344 _context = null; 345 } 346 } 347 } 348 349 /** Add parameters to the containing director. */ 350 @Override 351 protected void _addParameters() throws IllegalActionException, NameDuplicationException { 352 353 masterHostAndPort = (StringParameter) _director.getAttribute("masterHostAndPort"); 354 if(masterHostAndPort == null) { 355 masterHostAndPort = new StringParameter(_director, "masterHostAndPort"); 356 } 357 358 numSameJVMWorkers = (Parameter) _director.getAttribute("numSameJVMWorkers"); 359 if(numSameJVMWorkers == null) { 360 numSameJVMWorkers = new Parameter(_director, "numSameJVMWorkers"); 361 numSameJVMWorkers.setTypeEquals(BaseType.INT); 362 numSameJVMWorkers.setToken(String.valueOf(_numSameJVMWorkers)); 363 } 364 365 driverMemory = (StringParameter) _director.getAttribute("driverMemory"); 366 if(driverMemory == null) { 367 driverMemory = new StringParameter(_director, "driverMemory"); 368 driverMemory.setToken(SparkConnection.DEFAULT_DRIVER_MEMORY); 369 } 370 } 371 372 /** Remove engine-specific parameters from the director. */ 373 @Override 374 protected void _removeParameters() throws IllegalActionException, NameDuplicationException { 375 masterHostAndPort.setContainer(null); 376 numSameJVMWorkers.setContainer(null); 377 driverMemory.setContainer(null); 378 } 379 380 /** Constructs a SparkJob from the model. */ 381 protected SparkJob _getModelPlan() throws IllegalActionException { 382 383 _inputsAlreadyDone.clear(); 384 385 SparkJob plan = null; 386 387 // find all the sinks in the model 388 final List<DDPDataSink> sinks = _container.entityList(DDPDataSink.class); 389 390 if(sinks.isEmpty()) { 391 throw new IllegalActionException(_director, "No data sinks found."); 392 } 393 394 // for each sink, traverse the graph to the source, adding to the plan 395 for(DDPDataSink sink : sinks) { 396 397 // get the operator for this sink 398 DataSink operator = (DataSink) _getContract(sink); 399 400 // add sink to the plan 401 if(plan == null) { 402 plan = new SparkJob(_context, operator, "Kepler Spark Job " + _container.getName()); 403 } else { 404 plan.addDataSink(operator); 405 } 406 407 // traverse graph for this sink 408 _addInputsForContract(sink, operator); 409 } 410 411 return plan; 412 } 413 414 /** Recursively add inputs for a contract by traversing the model graph. 415 * 416 * @param pactor the current pactor in the model. 417 * @param contract the contract for the current pactor. 418 */ 419 private void _addInputsForContract(DDPPatternActor pactor, Operator operator) throws IllegalActionException { 420 421 // see if we've already done this contract 422 if(!_inputsAlreadyDone.contains(operator)) { 423 424 // see if the execution class name or execution code is set 425 final String executionClassName = pactor.getExecutionClassName(); 426 final String executionCodeType = pactor.getExecutionCodeType(); 427 if(executionClassName.isEmpty() && 428 executionCodeType == null && 429 (pactor instanceof SingleInputPatternActor)) { 430 // add sub-wf to plan configuration 431 _addSubModelToContract((SingleInputPatternActor)pactor, operator); 432 } 433 434 // see how many inputs are required 435 int numRequiredInputs = operator.numInputs(); 436 if(numRequiredInputs < 0 || numRequiredInputs > 2) { 437 throw new IllegalActionException(_director, "Unknown type of contract: " + operator.getClass()); 438 } 439 440 // see if there's at least one input 441 if(numRequiredInputs > 0) { 442 443 // get the first input port 444 final IOPort inputPort1 = (IOPort) pactor.getPort("in"); 445 if(inputPort1 == null) { 446 throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() + 447 "is missing input port \"in\"."); 448 } 449 450 _addInputForPactorPort(inputPort1, pactor, operator); 451 452 // see if there's a second input 453 if(numRequiredInputs > 1) { 454 455 final IOPort inputPort2 = (IOPort) pactor.getPort("in2"); 456 if(inputPort2 == null) { 457 throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() + 458 "is missing input port \"in2\"."); 459 } 460 461 _addInputForPactorPort(inputPort2, pactor, operator); 462 } 463 } 464 } 465 } 466 467 /** Set the input contract for a single input port and traverse the model 468 * graph to recursively add input contracts. 469 * 470 * @param port the input port for 471 * @param pactor the current pactor in the model. 472 * @param contract the contract for the current pactor. 473 */ 474 private void _addInputForPactorPort(IOPort port, DDPPatternActor pactor, Operator operator) 475 throws IllegalActionException { 476 477 // get the connected actor 478 final List<?> outputPorts = port.sourcePortList(); 479 if(outputPorts.isEmpty()) { 480 throw new IllegalActionException(_director, "DDPPatternActor input port " + 481 port.getName() + " must be connected."); 482 } else if(outputPorts.size() > 1) { 483 throw new IllegalActionException(_director, "DDPPatternActor input port " + 484 port.getName() + " may only be connected to one port."); 485 } 486 487 final IOPort outputPort1 = (IOPort) outputPorts.get(0); 488 final NamedObj outputNamedObj = outputPort1.getContainer(); 489 490 // FIXME 491 if(!(outputNamedObj instanceof DDPPatternActor)) { 492 throw new IllegalActionException(_director, "Actor " + pactor.getFullName() + 493 " is connected to a non-DDPPatternActor: " + outputNamedObj.getFullName()); 494 } 495 496 final Operator outputContract = _getContract((DDPPatternActor)outputNamedObj); 497 498 int numInputs = operator.numInputs(); 499 if(numInputs == 1) { 500 operator.setInput(outputContract); 501 } else if(numInputs == 2) { 502 if(port.getName().equals("in")) { 503 operator.setInput(0, outputContract); 504 } else if(port.getName().equals("in2")) { 505 operator.setInput(1, outputContract); 506 } else { 507 throw new IllegalActionException(port, "Input port must be named either \"in\" or \"in2\"."); 508 } 509 } 510 511 _inputsAlreadyDone.add(operator); 512 513 // recursively add the inputs for output pactor 514 _addInputsForContract((DDPPatternActor)outputNamedObj, outputContract); 515 } 516 517 /** Write the sub-workflow of a SingleInputPatternActor either to a parameter 518 * in the Spark job configuration or to a file. 519 */ 520 protected void _addSubModelToContract(SingleInputPatternActor pactor, Operator contract) 521 throws IllegalActionException { 522 523 final String name = pactor.getName(); 524 525 final Configuration contractConfiguration = contract.getParameters(); 526 contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, name); 527 528 contractConfiguration.setBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, 529 pactor.getRunWorkflowLifecyclePerInput()); 530 531 // make a clone of the pactor so we can add TypeAttributes to ports 532 // and add parameters from the hierarchy 533 534 Workspace workspaceForClone; 535 if(_sameJVM) { 536 workspaceForClone = new Workspace(name); 537 } else { 538 workspaceForClone = _director.workspace(); 539 } 540 541 SingleInputPatternActor clonePactor; 542 try { 543 clonePactor = (SingleInputPatternActor)pactor.clone(workspaceForClone); 544 } catch (CloneNotSupportedException e) { 545 throw new IllegalActionException(_director, e, "Unable to clone " + name); 546 } 547 548 // copy the port types to the clone 549 _setPortTypes(clonePactor); 550 551 // copy the parameters to the clone 552 _copyParameters(pactor, clonePactor); 553 554 // set display redirect path 555 String directDir = pactor.getDisplayRedirectDir(); 556 NamedObj redirectSpecifier = pactor; 557 if(directDir.isEmpty()) { 558 redirectSpecifier = _director; 559 directDir = _displayRedirectDir; 560 } 561 if(!directDir.isEmpty()) { 562 563 // display redirection when running in same jvm is not supported 564 if(_sameJVM) { 565 throw new IllegalActionException(redirectSpecifier, 566 "Redirecting display actors is not supported for" 567 + " Spark server running in the same JVM as Kepler."); 568 } 569 570 final File file = new File(directDir); 571 if (!file.exists() && !file.mkdirs()) { 572 throw new IllegalActionException(_director, "Could not create directories " + file); 573 } 574 if (!file.isDirectory() || !file.canWrite()) { 575 throw new IllegalActionException(redirectSpecifier, 576 "Parameter '" + _displayRedirectDir + "' must be a directory and writable."); 577 } 578 contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, directDir); 579 } 580 581 if(_sameJVM) { 582 _subWorkflows.put(name, clonePactor); 583 } else { 584 585 // remove top-level ports and relations 586 Utilities.removeModelPorts(clonePactor); 587 588 // serialize the clone pactor 589 590 if(_writeSubWorkflowsToFiles) { 591 592 if(_jobDir == null) { 593 _createJobDirectory(); 594 } 595 596 final String modelPath = _jobDir + name + ".xml"; 597 FileWriter writer = null; 598 try { 599 writer = new FileWriter(modelPath); 600 clonePactor.exportMoML(writer); 601 } catch(IOException e) { 602 throw new IllegalActionException(_director, e, "Error writing model to " + modelPath); 603 } finally { 604 if(writer != null) { 605 try { 606 writer.close(); 607 } catch (IOException e) { 608 throw new IllegalActionException(_director, e, "Error writing model to " + modelPath); 609 } 610 } 611 } 612 613 // add model file to plan configuration 614 contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, modelPath); 615 616 } else { 617 618 // write model to a string 619 final StringWriter writer = new StringWriter(); 620 try { 621 clonePactor.exportMoML(writer); 622 } catch (IOException e) { 623 throw new IllegalActionException(_director, e, "Error serializing model."); 624 } 625 626 // add string to configuration 627 contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODEL, writer.toString()); 628 } 629 630 // set the location of the kepler installation directory. 631 // NOTE: this is done so that the stub can initialize kepler and set 632 // the java properties for each module's workflow directory, e.g.: 633 // property("spark.workflowdir") 634 // if the modules directory does not exist on the stub, e.g., the file 635 // system is not shared, then initialization is not done and the stub 636 // workflow cannot use the module workflow directory properties. 637 contractConfiguration.set(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, 638 ProjectLocator.getKeplerModulesDir().getAbsolutePath()); 639 640 } 641 } 642 643 /** Get the Spark Context. */ 644 private void _getContext() throws IllegalActionException { 645 646 String typeStr = ((StringToken)_director.startServerType.getToken()).stringValue().trim(); 647 648 if(typeStr.isEmpty() || typeStr.equals("default")) { 649 650 if(!_masterHostAndPortStr.trim().isEmpty()) { 651 throw new IllegalActionException(_director, 652 "masterHostAndPort must not be specified if startServerType is default (or empty)."); 653 } 654 655 _context = SparkConnection.getContext( 656 "", 657 _numSameJVMWorkers, 658 _driverMemoryStr, 659 _container.getFullName()); 660 661 // see if we were able to get a context 662 if(_context == null) { 663 _sameJVM = false; 664 } else { 665 // see if the context is to a local master. 666 String sparkMasterStr = _context.getConf().get("spark.master"); 667 _sameJVM = sparkMasterStr.startsWith("local"); 668 } 669 670 } else if(typeStr.equals(DDPDirector.SAME_JVM_STRING)) { 671 672 _context = SparkConnection.getContext("local", 673 _numSameJVMWorkers, 674 _driverMemoryStr, 675 _container.getFullName()); 676 677 _sameJVM = true; 678 679 if(_context == null) { 680 throw new IllegalActionException(_director, 681 "Unable to create Spark context in same JVM."); 682 } 683 684 } else if(typeStr.equals(DDPDirector.DISTRIBUTED_STRING)) { 685 686 _sameJVM = false; 687 688 InetSocketAddress socketAddress; 689 if(_masterHostAndPortStr.trim().isEmpty()) { 690 socketAddress = SparkConnection.getDefaultMasterSocketAddress(); 691 } else { 692 socketAddress = SparkConnection.getMasterAndPortAddress(_masterHostAndPortStr); 693 } 694 695 // see if we can connect to standalone server 696 boolean connected = false; 697 try(Socket socket = new Socket();) { 698 socket.connect(socketAddress, _CONNECT_TIMEOUT); 699 connected = true; 700 } catch(IOException e) { 701 //System.err.println("IOException connecting to " + socketAddress + 702 //": " + e.getMessage()); 703 connected = false; 704 } 705 706 // start standalone server since could not connect 707 if(!connected) { 708 709 File configDirFile = new File(_configDirStr); 710 String parentDirStr = configDirFile.getParent(); 711 String startScriptStr; 712 String stopScriptStr; 713 714 ConfigurationProperty engineProperty = 715 Utilities.getEngineProperty(SPARK_ENGINE_NAME, _director); 716 717 ConfigurationProperty scriptsProperty = engineProperty.getProperty("Server.Scripts"); 718 if(scriptsProperty == null) { 719 throw new IllegalActionException(_director, "Server Scripts not found in configuration file."); 720 } 721 722 final String pathStr = parentDirStr + File.separator + "sbin" + 723 File.separator; 724 725 String startName; 726 String stopName; 727 728 // see if we are running under a resource manager 729 if(Utilities.isExecutingUnderResourceManager()) { 730 System.out.println("Running under cluster resource manager."); 731 startName = "ClusterStart"; 732 stopName = "ClusterStop"; 733 } else { 734 startName = "Start"; 735 stopName = "Stop"; 736 } 737 738 ConfigurationProperty property = scriptsProperty.getProperty(startName); 739 if(property == null) { 740 throw new IllegalActionException(_director, "No start script property in configuration file: " + startName); 741 } 742 startScriptStr = pathStr + property.getValue(); 743 744 745 property = scriptsProperty.getProperty(stopName); 746 if(property == null) { 747 throw new IllegalActionException(_director, "No stop script property in configuration file: " + startName); 748 } 749 stopScriptStr = pathStr + property.getValue(); 750 751 System.out.println("Spark server start script is " + startScriptStr); 752 System.out.println("Spark server stop script is " + stopScriptStr); 753 754 if(_checkServer(socketAddress, startScriptStr)) { 755 _stopScriptsForShutdown.add(stopScriptStr); 756 } 757 } 758 759 _context = SparkConnection.getContext( 760 _masterHostAndPortStr, 761 _numSameJVMWorkers, 762 _driverMemoryStr, 763 _container.getFullName()); 764 765 766 } else { 767 throw new IllegalActionException(_director, 768 "Unsupported value for startServerType: " + typeStr); 769 } 770 771 if(_context == null) { 772 throw new IllegalActionException(_director, 773 "Unable to create Spark context."); 774 } 775 776 } 777 778 /** Parse the output from the script that starts the server. 779 * @return If the start script specifies a server URL, returns the 780 * socket address for that URL. Otherwise, returns null. 781 */ 782 @Override 783 protected InetSocketAddress _parseOutputFromStartingServer(InputStream input) throws IOException, IllegalActionException { 784 InetSocketAddress retval = null; 785 if(Utilities.isExecutingUnderResourceManager()) { 786 String hostAndPortStr = SparkConnection.parseOutputFromStartingServer(input); 787 if(hostAndPortStr != null) { 788 _masterHostAndPortStr = hostAndPortStr; 789 retval = SparkConnection.getMasterAndPortAddress(_masterHostAndPortStr); 790 } 791 } 792 return retval; 793 } 794 795 /** Get the contract for a DDPPatternActor. */ 796 private Operator _getContract(DDPPatternActor actor) throws IllegalActionException { 797 798 Operator operator = null; 799 800 final String actorName = actor.getName(); 801 802 final String stubClassName = actor.getExecutionClassName(); 803 Class<?> stubClass = null; 804 805 // see if the stub was set 806 if(!stubClassName.isEmpty()) { 807 try { 808 stubClass = _classLoader.loadClass(stubClassName); 809 } catch (ClassNotFoundException e) { 810 throw new IllegalActionException(actor, e, 811 "Could not find execution class " + stubClassName); 812 } 813 } 814 815 final String stubCodeTypeName = actor.getExecutionCodeType(); 816 if(stubCodeTypeName != null) { 817 818 // TODO 819 if(!(actor instanceof Map) && !(actor instanceof Reduce)) { 820 throw new IllegalActionException(actor, "Code execution not yet supported for this pattern."); 821 } 822 } 823 824 final boolean stubIsSubWorkflow = (stubClass == null) && (stubCodeTypeName == null); 825 826 if(actor instanceof DDPDataSource) { 827 operator = _getSourceSinkContract((DDPDataSource)actor, true); 828 } else if(actor instanceof Map) { 829 830 PairFlatMapFunction stub; 831 832 if(stubIsSubWorkflow) { 833 // execute sub-workflow as stub 834 stub = new KeplerMapStub(); 835 } else if(stubCodeTypeName != null) { 836 stub = new ScriptEngineMapStub(); 837 } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) { 838 throw new IllegalActionException(actor, "Execution class " + 839 stubClassName + " must be a subclass of " + 840 PairFlatMapFunction.class.getName()); 841 } else { 842 System.out.println("Using Map execution class " + stubClassName + 843 " for " + actor.getFullName()); 844 845 try { 846 stub = (PairFlatMapFunction) stubClass.newInstance(); 847 } catch(Throwable t) { 848 throw new IllegalActionException(actor, t, 849 "Error instantiating map class " + stubClass.getName()); 850 } 851 852 } 853 854 855 operator = new MapOperator(stub, actorName); 856 857 } else if (actor instanceof Reduce) { 858 859 Function2<?,?,?> stub = null; 860 PairFlatMapFunction stubNewKeys = null; 861 862 if(stubIsSubWorkflow) { 863 // execute sub-workflow as stub 864 865 // see if reducer is also a combiner 866 // TODO 867 //if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) { 868 // stubClass = KeplerCombineAndReduceStub.class; 869 //} else { 870 stubNewKeys = new KeplerReduceStub(); 871 //} 872 873 } else if(stubCodeTypeName != null) { 874 stubNewKeys = new ScriptEngineReduceStub(); 875 } else if(Function2.class.isAssignableFrom(stubClass)) { 876 877 System.out.println("Using Reduce execution class " + stubClassName + 878 " for " + actor.getFullName()); 879 880 try { 881 stub = (Function2<?, ?, ?>) stubClass.newInstance(); 882 } catch(Throwable t) { 883 throw new IllegalActionException(actor, t, 884 "Error instantiating reduce class " + stubClass.getName()); 885 } 886 887 } else if(PairFlatMapFunction.class.isAssignableFrom(stubClass)) { 888 889 System.out.println("Using Reduce execution class " + stubClassName + 890 " for " + actor.getFullName()); 891 892 try { 893 stubNewKeys = (PairFlatMapFunction) stubClass.newInstance(); 894 } catch(Throwable t) { 895 throw new IllegalActionException(actor, t, 896 "Error instantiating map class " + stubClass.getName()); 897 } 898 899 } else { 900 throw new IllegalActionException(actor, "Execution class " + 901 stubClassName + " must be a subclass of " + 902 Function2.class.getName() + " or " + 903 PairFlatMapFunction.class.getName()); 904 } 905 906 if(stub != null) { 907 operator = new ReduceOperator(stub, actorName); 908 } else { 909 operator = new ReduceOperator(stubNewKeys, actorName); 910 } 911 912 } else if(actor instanceof Cross) { 913 914 PairFlatMapFunction stub; 915 916 // see if execution class was set 917 if(stubClass == null) { 918 // execute sub-workflow as stub 919 stub = new KeplerCrossStub(); 920 } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) { 921 throw new IllegalActionException(actor, "Execution class " + 922 stubClassName + " must be a subclass of " + 923 PairFlatMapFunction.class.getName()); 924 } else { 925 System.out.println("Using Cross execution class " + stubClassName + 926 " for " + actor.getFullName()); 927 928 try { 929 stub = (PairFlatMapFunction) stubClass.newInstance(); 930 } catch(Throwable t) { 931 throw new IllegalActionException(actor, t, 932 "Error instantiating cross class " + stubClass.getName()); 933 } 934 } 935 936 operator = new CrossOperator(stub, actorName); 937 938 } else if(actor instanceof CoGroup) { 939 940 PairFlatMapFunction stub; 941 942 // see if execution class was set 943 if(stubClass == null) { 944 // execute sub-workflow as stub 945 stub = new KeplerCoGroupStub(); 946 } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) { 947 throw new IllegalActionException(actor, "Execution class " + 948 stubClassName + " must be a subclass of " + 949 PairFlatMapFunction.class.getName()); 950 } else { 951 System.out.println("Using CoGroup execution class " + stubClassName + 952 " for " + actor.getFullName()); 953 954 try { 955 stub = (PairFlatMapFunction) stubClass.newInstance(); 956 } catch(Throwable t) { 957 throw new IllegalActionException(actor, t, 958 "Error instantiating cogroup class " + stubClass.getName()); 959 } 960 } 961 962 operator = new CoGroupOperator(stub, actorName); 963 964 } else if(actor instanceof Match) { 965 966 PairFlatMapFunction stub; 967 968 // see if execution class was set 969 if(stubClass == null) { 970 // execute sub-workflow as stub 971 stub = new KeplerMatchStub(); 972 } else if(!PairFlatMapFunction.class.isAssignableFrom(stubClass)) { 973 throw new IllegalActionException(actor, "Execution class " + 974 stubClassName + " must be a subclass of " + 975 PairFlatMapFunction.class.getName()); 976 } else { 977 System.out.println("Using Match execution class " + stubClassName + 978 " for " + actor.getFullName()); 979 980 try { 981 stub = (PairFlatMapFunction) stubClass.newInstance(); 982 } catch(Throwable t) { 983 throw new IllegalActionException(actor, t, 984 "Error instantiating match class " + stubClass.getName()); 985 } 986 } 987 988 operator = new JoinOperator(stub, actorName); 989 990 } else if(actor instanceof DDPDataSink) { 991 operator = _getSourceSinkContract((DDPDataSink)actor, false); 992 } else { 993 throw new IllegalActionException(_director, "Cannot determine Operator type for " + 994 actor.getFullName()); 995 } 996 997 // set the number of parallel instances. 998 int numInstances = actor.getDegreeOfParallelism(); 999 1000 // if less than 1, use the director's value 1001 if(numInstances <= 0) { 1002 numInstances = _degreeOfParallelism; 1003 } 1004 // check if greater than num local workers 1005 else if(numInstances > _sparkParallelism) { 1006 System.err.println("WARNING: degree of parallelism for " + 1007 actor.getName() + " is " + numInstances + ", which is " + 1008 "greater than Spark's parallelism " + 1009 "(" + _sparkParallelism + ")."); 1010 if(_sameJVM) { 1011 System.err.println("Increase the number of local workers to achieve " + 1012 "this degree of parallelism."); 1013 } 1014 } 1015 1016 operator.setDegreeOfParallelism(numInstances); 1017 1018 // set any job arguments if an execution class is used (not a sub-workflow) 1019 final Configuration configuration = operator.getParameters(); 1020 if(!stubIsSubWorkflow) { 1021 for(java.util.Map.Entry<String,String> entry : _jobArgumentsMap.entrySet()) { 1022 // XXX this assumes they are all strings. 1023 configuration.set(entry.getKey(), entry.getValue()); 1024 } 1025 } 1026 1027 // add any actor parameters to the contract configuration 1028 final java.util.Map<String,String> contractParameters = actor.getParameters(); 1029 final java.util.Map<String,String> paraNames = actor.getParaImplNames(_engineName); 1030 for(java.util.Map.Entry<String, String> entry : contractParameters.entrySet()) { 1031 String keplerParaName = entry.getKey(); 1032 if (paraNames.get(keplerParaName) != null) { 1033 configuration.set(paraNames.get(keplerParaName), entry.getValue()); 1034 } else { 1035 configuration.set(Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::" + keplerParaName, entry.getValue()); 1036 } 1037 } 1038 //set print execution info parameter 1039 configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, actor.getPrintExeInfo()); 1040 1041 // set the same JVM flag. 1042 if(_sameJVM) { 1043 configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, true); 1044 } 1045 1046 if(stubCodeTypeName != null) { 1047 1048 String scriptEngineFactoryName = Utilities.getScriptEngineFactoryName(stubCodeTypeName); 1049 if(scriptEngineFactoryName == null) { 1050 throw new IllegalActionException(actor, 1051 "No script engine has been configured for " + stubCodeTypeName); 1052 } 1053 1054 if(!_sameJVM) { 1055 List<String> jars = Utilities.getJarsForLanguage(stubCodeTypeName); 1056 _additionalJars.addAll(jars); 1057 } 1058 1059 configuration.set(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, scriptEngineFactoryName); 1060 1061 configuration.set(Utilities.CONFIGURATION_KEPLER_STUB_CODE, actor.getExecutionCode()); 1062 1063 } 1064 1065 return operator; 1066 } 1067 1068 /** Get a contract for a source or sink. 1069 * @param actor the Pactor 1070 * @param parameterName the name of the parameter containing the class name. 1071 * @param input if true, get a source contract. if false, get a sink contract. 1072 */ 1073 private Operator _getSourceSinkContract(AtomicPatternActor actor, boolean input) throws IllegalActionException { 1074 1075 final String actorName = actor.getName(); 1076 1077 String className = actor.getFormatClassName(_engineName); 1078 if(className == null) { 1079 throw new IllegalActionException(_director, "Could not find format class name for " + 1080 " actor " + actorName); 1081 } 1082 1083 // try to get the class 1084 Class<?> clazz; 1085 try { 1086 clazz = _classLoader.loadClass(className); 1087 } catch (ClassNotFoundException e) { 1088 throw new IllegalActionException(actor, "Format type " + className + 1089 " was not found in the format types configurations or is a class not on the classpath."); 1090 } 1091 1092 Operator operator = null; 1093 1094 if(input) { 1095 1096 if(FileInputFormat.class.isAssignableFrom(clazz)) { 1097 operator = new FileDataSource((Class<? extends FileInputFormat<?,?>>) clazz, 1098 ((DDPDataSource) actor).getPathAsURI(), actorName); 1099 } else if(TokenSource.class.isAssignableFrom(clazz)) { 1100 1101 ArrayToken token = DDPDataSource.getToken(actor.getFullName()); 1102 1103 if(token == null) { 1104 throw new IllegalActionException(actor, 1105 "No input token found for source actor " + actorName); 1106 } 1107 1108 operator = new TokenSource(token, actor.getName()); 1109 1110 } else { 1111 throw new IllegalActionException(actor, "Unsupported type of format class: " + 1112 clazz.getName()); 1113 } 1114 1115 ((DataSource) operator).setContext(_context); 1116 1117 1118 } else { 1119 1120 if(FileOutputFormat.class.isAssignableFrom(clazz)) { 1121 1122 operator = new FileDataSink((Class<? extends FileOutputFormat<?,?>>) clazz, 1123 ((DDPDataSink)actor).getPathAsURI(), actorName); 1124 1125 } else if(TokenSink.class.isAssignableFrom(clazz)) { 1126 1127 operator = new TokenSink(actor.getFullName(), actorName); 1128 1129 } else if(clazz == NullOutputFormat.class) { 1130 operator = new NullSink(actorName); 1131 } else { 1132 throw new IllegalActionException(actor, "Unsupported type of format class: " + 1133 clazz.getName()); 1134 } 1135 } 1136 1137 return operator; 1138 1139 } 1140 1141 /////////////////////////////////////////////////////////////////// 1142 //// private fields ////// 1143 1144 /** The name of the spark engine. */ 1145 public final static String SPARK_ENGINE_NAME = "Spark"; 1146 1147 /** */ 1148 private JavaSparkContext _context; 1149 1150 private Set<Operator> _inputsAlreadyDone = new HashSet<Operator>(); 1151 1152 /** A Spark job that can be cancelled. */ 1153 private SparkJob _job; 1154 1155 /** 1156 * A list of scripts to execute to stop each spark server that is 1157 * started. 1158 */ 1159 private final static Set<String> _stopScriptsForShutdown = Collections 1160 .synchronizedSet(new HashSet<String>()); 1161 1162 /** The number of workers for the local context. */ 1163 private int _numSameJVMWorkers = SparkConnection.DEFAULT_NUM_LOCAL_WORKERS; 1164 1165 private String _driverMemoryStr = SparkConnection.DEFAULT_DRIVER_MEMORY; 1166 1167 private String _masterHostAndPortStr = ""; 1168 1169 private int _sparkParallelism; 1170 1171}