001/* 002 * Copyright (c) 2015 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 18:53:35 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34661 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.spark.actor; 030 031import java.io.BufferedReader; 032import java.io.File; 033import java.io.IOException; 034import java.io.InputStream; 035import java.io.InputStreamReader; 036import java.net.InetAddress; 037import java.net.InetSocketAddress; 038import java.net.Socket; 039import java.net.UnknownHostException; 040 041import org.apache.spark.SparkConf; 042import org.apache.spark.api.java.JavaSparkContext; 043import org.kepler.build.modules.Module; 044import org.kepler.build.modules.ModuleTree; 045import org.kepler.configuration.ConfigurationProperty; 046import org.kepler.ddp.Utilities; 047import org.kepler.spark.director.SparkEngine; 048 049import ptolemy.data.IntToken; 050import ptolemy.data.StringToken; 051import ptolemy.data.expr.Parameter; 052import ptolemy.data.expr.StringParameter; 053import ptolemy.data.type.BaseType; 054import ptolemy.kernel.util.Attribute; 055import ptolemy.kernel.util.IllegalActionException; 056import ptolemy.kernel.util.NameDuplicationException; 057import ptolemy.kernel.util.NamedObj; 058import ptolemy.util.MessageHandler; 059import ptolemy.vergil.icon.BoxedValueIcon; 060 061/** An attribute that contains connection information to a Spark server. 062 * The value specified in connectionName is used by Spark actors to 063 * refer to an instance of this attribute and use its Spark server. 064 * 065 * @author Daniel Crawl 066 * @version $Id: SparkConnection.java 34661 2018-02-07 18:53:35Z crawl $ 067 */ 068public class SparkConnection extends Attribute { 069 070 /** Create a new SparkConnection with the specified name. */ 071 public SparkConnection(NamedObj container, String name) 072 throws IllegalActionException, NameDuplicationException { 073 super(container, name); 074 075 connectionName = new StringParameter(this, "connectionName"); 076 connectionName.setToken("default"); 077 078 serverName = new StringParameter(this, "serverName"); 079 serverName.setToken(""); 080 081 numLocalWorkers = new Parameter(this, "numLocalWorkers"); 082 numLocalWorkers.setTypeEquals(BaseType.INT); 083 numLocalWorkers.setToken(new IntToken(DEFAULT_NUM_LOCAL_WORKERS)); 084 085 driverMemory = new StringParameter(this, "driverMemory"); 086 driverMemory.setToken(DEFAULT_DRIVER_MEMORY); 087 088 // set the icon to a BoxedValueIcon to display the profile name 089 BoxedValueIcon icon = new BoxedValueIcon(this, "_icon"); 090 icon.displayWidth.setExpression("25"); 091 icon.attributeName.setExpression("connectionName"); 092 icon.boxColor.setExpression("{0.0, 1.0, 1.0, 1.0}"); 093 } 094 095 @Override 096 public void attributeChanged(Attribute attribute) throws IllegalActionException { 097 098 if(attribute == numLocalWorkers) { 099 int val = ((IntToken)numLocalWorkers.getToken()).intValue(); 100 synchronized(_contextLock) { 101 if(val != _numLocalWorkers) { 102 _numLocalWorkers = val; 103 } 104 } 105 } else if(attribute == driverMemory) { 106 String driverMemoryStr = ((StringToken)driverMemory.getToken()).stringValue(); 107 synchronized(_contextLock) { 108 if(!driverMemoryStr.equals(_driverMemory)) { 109 _driverMemory = driverMemoryStr; 110 } 111 } 112 } else if(attribute == serverName) { 113 String serverNameStr = ((StringToken)serverName.getToken()).stringValue(); 114 synchronized(_contextLock) { 115 if(!serverNameStr.equals(_serverName)) { 116 _serverName = serverNameStr; 117 } 118 } 119 } else { 120 super.attributeChanged(attribute); 121 } 122 } 123 124 /** Clone this object into the specified workspace. */ 125 /* 126 @Override 127 public Object clone(Workspace workspace) throws CloneNotSupportedException { 128 SparkConnection newObject = (SparkConnection) super.clone(workspace); 129 return newObject; 130 } 131 */ 132 133 /** Get the name used for this attribute. */ 134 public String getConnectionName() throws IllegalActionException { 135 return ((StringToken)connectionName.getToken()).stringValue(); 136 } 137 138 /** Get the context using the parameters of this SparkConnection 139 * (serverName, driverMemory, numLocalWorkers, etc.). 140 */ 141 public JavaSparkContext getContext() throws IllegalActionException { 142 synchronized(_contextLock) { 143 return getContext(_serverName, _numLocalWorkers, _driverMemory, 144 getConnectionName()); 145 } 146 } 147 148 /** Get the context for a specific master. 149 * @param masterNameAndPortStr The master host and port. If empty, 150 * then environment variables MASTER, SPARK_MASTER_IP, and SPARK_LOCAL_IP 151 * are checked for the master. If these are empty, then try to connect 152 * to a master running on localhost. If the connection fails, then a 153 * local master is used (runs in same JVM). 154 * @param numLocalWorkersVal The number of workers to use for a 155 * local master. 156 * @param driverMemoryStr The amount of memory in the driver. 157 * @param connectionNameStr The connecting name string. 158 */ 159 public static JavaSparkContext getContext(String masterNameAndPortStr, 160 int numLocalWorkersVal, String driverMemoryStr, 161 String connectionNameStr) throws IllegalActionException { 162 163 synchronized(_contextLock) { 164 165 // see if we already created the context and any of the 166 // parameters are different. 167 if(_context != null && ( 168 _serverName == null || 169 !_serverName.equals(masterNameAndPortStr) || 170 _numLocalWorkers != numLocalWorkersVal || 171 _driverMemory == null || 172 !_driverMemory.equals(driverMemoryStr))) { 173 System.out.println("Will recreate Spark context since parameter(s) have changed."); 174 //System.out.println("CONTEXT COUNTER = " + _contextCounter); 175 _context.stop(); 176 _context = null; 177 if(_startedMySparkServer) { 178 _stopServerWithMySpark(); 179 } 180 } 181 182 if(_context == null) { 183 184 String masterStr; 185 // if the server was not specified, use the default. 186 if(masterNameAndPortStr == null || 187 masterNameAndPortStr.trim().isEmpty()) { 188 189 // get the default address for the master 190 InetSocketAddress socketAddress = getDefaultMasterSocketAddress(); 191 192 // see if we can connect 193 boolean connected = false; 194 try(Socket socket = new Socket();) { 195 socket.connect(socketAddress, _CONNECT_TIMEOUT); 196 connected = true; 197 } catch(IOException e) { 198 //System.err.println("IOException connecting to " + socketAddress + 199 //": " + e.getMessage()); 200 connected = false; 201 } 202 203 // if we couldn't connect, use local 204 if(connected) { 205 masterStr = "spark://" + 206 socketAddress.getHostName() + ":" + 207 socketAddress.getPort(); 208 } 209 // if we are running on a cluster, start the standalone 210 // server using the myspark scripts. 211 else if(Utilities.isExecutingUnderResourceManager()) { 212 masterStr = _startServerWithMySpark(); 213 } else { 214 masterStr = "local[" + 215 (numLocalWorkersVal < 1 ? "*" : numLocalWorkersVal) + 216 "]"; 217 } 218 } else if(masterNameAndPortStr.startsWith("local")) { 219 masterStr = "local[" + 220 (numLocalWorkersVal < 1 ? "*" : numLocalWorkersVal) + 221 "]"; 222 } else if(!masterNameAndPortStr.startsWith("spark://")) { 223 masterStr = "spark://" + masterNameAndPortStr; 224 } else { 225 masterStr = masterNameAndPortStr; 226 } 227 228 if(driverMemoryStr.trim().isEmpty()) { 229 driverMemoryStr = DEFAULT_DRIVER_MEMORY; 230 System.err.println("WARNING: Spark driver memory not specified. " + 231 "Using default value of " + 232 DEFAULT_DRIVER_MEMORY); 233 } 234 235 _context = _createContext(masterStr, driverMemoryStr, 236 connectionNameStr); 237 238 _serverName = masterStr; 239 _numLocalWorkers = numLocalWorkersVal; 240 _driverMemory = driverMemoryStr; 241 } 242 243 if(_context != null) { 244 _contextCounter++; 245 //System.out.println("CONTEXT COUNTER incremented = " + _contextCounter); 246 247 if(!_context.getConf().get("spark.master").startsWith("local")) { 248 // add the module jars, e.g., actors.jar, ptolemy.jar, etc. 249 final ModuleTree moduleTree = ModuleTree.instance(); 250 for(Module module : moduleTree) { 251 252 // Spark 2 throws an exception when more than one jar with 253 // the same name is added (even when in different directories). 254 // So we need to exclude the (empty) kepler-tasks.jar that is 255 // built for the kepler-tasks module. 256 if(!module.getName().equals("kepler-tasks")) { 257 final File moduleJar = module.getTargetJar(); 258 259 // add the module jar if it exists. 260 // some modules, e.g., outreach, do not have jars. 261 if(moduleJar.exists()) { 262 _context.addJar(moduleJar.getAbsolutePath()); 263 } 264 } 265 } 266 } 267 } 268 } 269 return _context; 270 } 271 272 /** Get the default context. */ 273 public static JavaSparkContext getDefaultContext() throws IllegalActionException { 274 return getContext(_serverName, _numLocalWorkers, 275 _driverMemory, "default"); 276 } 277 278 /** Get the default Spark master socket address. */ 279 public static InetSocketAddress getDefaultMasterSocketAddress() throws IllegalActionException { 280 281 String masterHostAndPortStr = null; 282 InetSocketAddress socketAddress; 283 284 // first try environment variables: MASTER, SPARK_MASTER_IP, SPARK_LOCAL_IP 285 String envMaster = System.getenv("MASTER"); 286 if(envMaster != null && envMaster.startsWith("spark://")) { 287 masterHostAndPortStr = envMaster; 288 System.out.println("Using $MASTER: " + masterHostAndPortStr); 289 socketAddress = getMasterAndPortAddress(masterHostAndPortStr); 290 } else { 291 String hostname = null; 292 String envSparkMasterIP = System.getenv("SPARK_MASTER_IP"); 293 if(envSparkMasterIP != null) { 294 hostname = envSparkMasterIP; 295 System.out.println("Using $SPARK_MASTER_IP: " + envSparkMasterIP); 296 } else { 297 String envSparkLocalIP = System.getenv("SPARK_LOCAL_IP"); 298 if(envSparkLocalIP != null) { 299 hostname = envSparkLocalIP; 300 System.out.println("Using $SPARK_LOCAL_IP: " + envSparkLocalIP); 301 } else { 302 // finally, use localhost 303 try { 304 hostname = InetAddress.getLocalHost().getHostName(); 305 } catch (UnknownHostException e) { 306 throw new IllegalActionException("Error determining host name:" + 307 e.getMessage()); 308 } 309 } 310 } 311 312 if(hostname == null || hostname.isEmpty()) { 313 throw new IllegalActionException("Unable to determine host name."); 314 } 315 316 //System.out.println("Using default port for master: " + DEFAULT_MASTER_PORT); 317 socketAddress = new InetSocketAddress(hostname, DEFAULT_MASTER_PORT); 318 } 319 320 return socketAddress; 321 } 322 323 /** Parse a string containing the master host and port and return the 324 * socket address. 325 */ 326 public static InetSocketAddress getMasterAndPortAddress(String masterHostAndPortStr) throws IllegalActionException { 327 boolean isSparkURL = masterHostAndPortStr.startsWith("spark://"); 328 String hostname = null; 329 int port; 330 String[] parts = masterHostAndPortStr.split(":"); 331 if(parts.length != 2 && 332 (!isSparkURL || parts.length != 3)) { 333 throw new IllegalActionException("masterHostAndPort must have both " + 334 " master host name and port."); 335 } 336 if(isSparkURL) { 337 hostname = parts[1].substring("//".length()); 338 try { 339 port = Integer.parseInt(parts[2]); 340 } catch(NumberFormatException e) { 341 throw new IllegalActionException("Unable to parse port: " + parts[1] + 342 ": " + e.getMessage()); 343 } 344 } else { 345 hostname = parts[0]; 346 try { 347 port = Integer.valueOf(parts[1]).intValue(); 348 } catch(NumberFormatException e) { 349 throw new IllegalActionException("Unable to parse port: " + parts[1] + 350 ": " + e.getMessage()); 351 } 352 } 353 //System.out.println("h = " + hostname + " p = " + port); 354 return new InetSocketAddress(hostname, port); 355 } 356 357 /** Parse the output from starting the server using start-myspark.sh 358 * @return The master URL if successful, otherwise null. 359 */ 360 public static String parseOutputFromStartingServer(InputStream input) throws IOException, IllegalActionException { 361 String masterHostAndPortStr = null; 362 try(BufferedReader reader = new BufferedReader(new InputStreamReader(input));) { 363 String line = null; 364 while ((line = reader.readLine()) != null) { 365 if(line.startsWith("MASTER=spark://")) { 366 String[] parts = line.split("="); 367 masterHostAndPortStr = parts[1]; 368 System.out.println("Spark master host and port is " + masterHostAndPortStr); 369 } 370 } 371 } 372 return masterHostAndPortStr; 373 } 374 375 public static void releaseContext() { 376 synchronized(_contextLock) { 377 if(_context != null) { 378 _contextCounter--; 379 //System.out.println("CONTEXT COUNTER decremented = " + _contextCounter); 380 if(_contextCounter == 0) { 381 // if we are using a standalone server, close the connection 382 // since otherwise we can't submit new jobs. 383 //if(!_context.getConf().get("spark.master").startsWith("local")) { 384 _context.stop(); 385 //} 386 _context = null; 387 } 388 } 389 } 390 } 391 392 /** Stop any contexts that were started. */ 393 public static void shutdown() { 394 synchronized(_contextLock) { 395 if(_context != null) { 396 _context.stop(); 397 _context = null; 398 } 399 400 if(_startedMySparkServer) { 401 try { 402 _stopServerWithMySpark(); 403 } catch (IllegalActionException e) { 404 MessageHandler.error("Error stopping Spark server.", e); 405 } 406 } 407 } 408 } 409 410 /** The name of the connection. */ 411 public StringParameter connectionName; 412 413 /** The host name of the Spark server. Use "local" to run on the local machine. */ 414 public StringParameter serverName; 415 416 /** The number of workers when running Spark locally. */ 417 public Parameter numLocalWorkers; 418 419 /** The amount of memory to use in the Spark driver. */ 420 public StringParameter driverMemory; 421 422 /////////////////////////////////////////////////////////////////// 423 //// private methods //// 424 425 /** Create a context. */ 426 private static JavaSparkContext _createContext(final String masterAndPortStr, 427 String driverMemoryStr, final String connectionNameStr) 428 throws IllegalActionException { 429 430 final JavaSparkContext[] contexts = new JavaSparkContext[1]; 431 432 // try to get the spark context. 433 // use a ExecutorService with a timeout to handle 434 // connecting to a master run by someone else. 435 /* 436 ExecutorService service = Executors.newSingleThreadExecutor(); 437 try { 438 service.submit(new Runnable() { 439 @Override 440 public void run() { 441 */ 442 //System.out.println("Getting JavaSparkContext for " + 443 //connectionNameStr + " at " + masterAndPortStr); 444 445 SparkConf conf = new SparkConf() 446 .setMaster(masterAndPortStr) 447 .setAppName("Kepler Spark " + connectionNameStr) 448 .set("spark.driver.memory", DEFAULT_DRIVER_MEMORY); 449 450 contexts[0] = new JavaSparkContext(conf); 451 452 //contexts[0].sc().addSparkListener(new StdoutSparkListener()); 453 454 // if we're not using the local server, we need to wait 455 // for the scheduler to set the default parallelism to 456 // be the total number of cores. see: 457 // https://issues.apache.org/jira/browse/SPARK-809 458 // 459 460 if(!masterAndPortStr.startsWith("local")) { 461 // wait 5 seconds for the executor added events 462 System.out.println("Waiting 5 seconds to receive executor(s) information."); 463 try { 464 Thread.sleep(5000); 465 } catch (InterruptedException e) { 466 throw new IllegalActionException("Interrupted while waiting for " + 467 "Spark scheduler to initialize: " + e.getMessage()); 468 } 469 } 470 /* 471 System.out.println("finished calling jsp ctor"); 472 } 473 }).get(_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS); 474 } catch (InterruptedException | ExecutionException | TimeoutException e) { 475 System.err.println("WARNING: Timeout connecting to master at " + 476 masterAndPortStr + "."); 477 } finally { 478 service.shutdownNow(); 479 } 480 */ 481 482 if(contexts[0] != null) { 483 System.out.println("Created Spark context for: " + 484 contexts[0].getConf().get("spark.master")); 485 } 486 487 return contexts[0]; 488 } 489 490 /** Start the standalone server using start-myspark.sh. 491 * @return Returns the master and port string. 492 */ 493 private static String _startServerWithMySpark() throws IllegalActionException { 494 495 // set the default location of the config directory 496 String workflowDirStr = System.getProperty("spark.workflowdir"); 497 if(workflowDirStr == null) { 498 throw new IllegalActionException("System property " + 499 "spark.workflowdir not set."); 500 } 501 502 String configDirStr = workflowDirStr + File.separator + 503 "tools" + File.separator + "conf"; 504 505 File configDirFile = new File(configDirStr); 506 String parentDirStr = configDirFile.getParent(); 507 508 final String pathStr = parentDirStr + File.separator + "sbin" + 509 File.separator; 510 511 ConfigurationProperty engineProperty = 512 Utilities.getEngineProperty(SparkEngine.SPARK_ENGINE_NAME, null); 513 514 ConfigurationProperty startProperty = engineProperty.getProperty("Server.Scripts.ClusterStart"); 515 if(startProperty == null) { 516 throw new IllegalActionException("Could not find ClusterStart script in configuration file."); 517 } 518 519 String startScriptStr = pathStr + startProperty.getValue(); 520 521 System.out.println("Starting Spark server with " + startScriptStr); 522 523 // see if the script is executable. kepler modules are zipped, 524 // which does not preserve the permissions. 525 File startScriptFile = new File(startScriptStr); 526 if(!startScriptFile.canExecute()) { 527 throw new IllegalActionException( 528 "The script " + startScriptFile + " is not executable.\n" + 529 "You must change the permissions so that " + 530 startScriptFile.getName() + 531 " and all the other scripts in \n" + 532 startScriptFile.getParent() + " are executable."); 533 } 534 535 ProcessBuilder builder = new ProcessBuilder(startScriptStr); 536 builder.redirectErrorStream(true); 537 538 String masterAndPortStr; 539 540 try { 541 Process process = builder.start(); 542 masterAndPortStr = parseOutputFromStartingServer(process.getInputStream()); 543 process.waitFor(); 544 _startedMySparkServer = true; 545 } catch(Exception e) { 546 throw new IllegalActionException("Error starting Spark server " + 547 "using myspark: " + e.getMessage()); 548 } 549 550 551 return masterAndPortStr; 552 } 553 554 /** Stop the standalone server using stop-myspark.sh. */ 555 private static void _stopServerWithMySpark() throws IllegalActionException { 556 557 // set the default location of the config directory 558 String workflowDirStr = System.getProperty("spark.workflowdir"); 559 if(workflowDirStr == null) { 560 throw new IllegalActionException("System property " + 561 "spark.workflowdir not set."); 562 } 563 564 String configDirStr = workflowDirStr + File.separator + 565 "tools" + File.separator + "conf"; 566 567 File configDirFile = new File(configDirStr); 568 String parentDirStr = configDirFile.getParent(); 569 570 final String pathStr = parentDirStr + File.separator + "sbin" + 571 File.separator; 572 573 ConfigurationProperty engineProperty = 574 Utilities.getEngineProperty(SparkEngine.SPARK_ENGINE_NAME, null); 575 576 ConfigurationProperty startProperty = engineProperty.getProperty("Server.Scripts.ClusterStop"); 577 if(startProperty == null) { 578 throw new IllegalActionException("Could not find ClusterStop script in configuration file."); 579 } 580 581 String stopScriptStr = pathStr + startProperty.getValue(); 582 583 System.out.println("Stopping Spark server with " + stopScriptStr); 584 585 // see if the script is executable. kepler modules are zipped, 586 // which does not preserve the permissions. 587 File startScriptFile = new File(stopScriptStr); 588 if(!startScriptFile.canExecute()) { 589 throw new IllegalActionException( 590 "The script " + startScriptFile + " is not executable.\n" + 591 "You must change the permissions so that " + 592 startScriptFile.getName() + 593 " and all the other scripts in \n" + 594 startScriptFile.getParent() + " are executable."); 595 } 596 597 ProcessBuilder builder = new ProcessBuilder(stopScriptStr); 598 builder.redirectErrorStream(true); 599 600 try { 601 Process process = builder.start(); 602 process.waitFor(); 603 _startedMySparkServer = false; 604 } catch(Exception e) { 605 throw new IllegalActionException("Error stopping Spark server " + 606 "using myspark: " + e.getMessage()); 607 } 608 609 } 610 611 /////////////////////////////////////////////////////////////////// 612 //// private fields //// 613 614 /** The spark context. 615 * NOTE: Spark only allows a single SparkContext per JVM, 616 * so this is static. 617 */ 618 private static JavaSparkContext _context; 619 620 /** Lock object for _context. */ 621 private static final Object _contextLock = new Object(); 622 623 /** The default number of local workers. */ 624 public static final int DEFAULT_NUM_LOCAL_WORKERS = 0; 625 626 /** The number of workers for the local context. */ 627 private static int _numLocalWorkers = DEFAULT_NUM_LOCAL_WORKERS; 628 629 /** The default amount of memory for the Spark driver. */ 630 public static final String DEFAULT_DRIVER_MEMORY = "1g"; 631 632 /** The amount of memory for the Spark driver. */ 633 private static String _driverMemory = DEFAULT_DRIVER_MEMORY; 634 635 /** Default port for the master. */ 636 public final static int DEFAULT_MASTER_PORT = 7077; 637 638 /** Timeout (in ms) when seeing if server is running. */ 639 private final static int _CONNECT_TIMEOUT = 10*1000; 640 641 /** Default server name. */ 642 private static String _serverName = ""; 643 644 /** Reference counter for the context. */ 645 private static int _contextCounter = 0; 646 647 /** If true, we've started standalone server using myspark. */ 648 private static boolean _startedMySparkServer = false; 649 650}