001/* An engine than runs models in Stratosphere. 002 * 003 * Copyright (c) 2011-2013 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.director; 031 032import java.io.File; 033import java.io.FileWriter; 034import java.io.IOException; 035import java.io.StringWriter; 036import java.net.InetSocketAddress; 037import java.net.URI; 038import java.util.Collections; 039import java.util.HashSet; 040import java.util.LinkedList; 041import java.util.List; 042import java.util.Set; 043 044import org.kepler.build.project.ProjectLocator; 045import org.kepler.ddp.Utilities; 046import org.kepler.ddp.actor.pattern.AtomicPatternActor; 047import org.kepler.ddp.actor.pattern.CoGroup; 048import org.kepler.ddp.actor.pattern.Cross; 049import org.kepler.ddp.actor.pattern.DDPDataSink; 050import org.kepler.ddp.actor.pattern.DDPDataSource; 051import org.kepler.ddp.actor.pattern.DDPPatternActor; 052import org.kepler.ddp.actor.pattern.Map; 053import org.kepler.ddp.actor.pattern.Match; 054import org.kepler.ddp.actor.pattern.Reduce; 055import org.kepler.ddp.actor.pattern.SingleInputPatternActor; 056import org.kepler.ddp.actor.pattern.Types; 057import org.kepler.ddp.director.DDPDirector; 058import org.kepler.ddp.director.DDPEngine; 059import org.kepler.stratosphere.io.input.TokenDataSource; 060import org.kepler.stratosphere.io.input.TokenInputFormat; 061import org.kepler.stratosphere.io.output.NullOutputFormat; 062import org.kepler.stratosphere.io.output.TokenOutputFormat; 063import org.kepler.stratosphere.stub.KeplerCoGroupStub; 064import org.kepler.stratosphere.stub.KeplerCombineAndReduceStub; 065import org.kepler.stratosphere.stub.KeplerCrossStub; 066import org.kepler.stratosphere.stub.KeplerMapStub; 067import org.kepler.stratosphere.stub.KeplerMatchStub; 068import org.kepler.stratosphere.stub.KeplerReduceStub; 069import org.kepler.stratosphere.stub.ScriptEngineMapStub; 070import org.kepler.stratosphere.stub.ScriptEngineReduceStub; 071import org.kepler.stratosphere.type.TypeUtilities; 072 073import eu.stratosphere.api.common.Plan; 074import eu.stratosphere.api.common.io.FileInputFormat; 075import eu.stratosphere.api.common.io.InputFormat; 076import eu.stratosphere.api.common.io.OutputFormat; 077import eu.stratosphere.api.common.operators.DualInputOperator; 078import eu.stratosphere.api.common.operators.FileDataSink; 079import eu.stratosphere.api.common.operators.FileDataSource; 080import eu.stratosphere.api.common.operators.GenericDataSink; 081import eu.stratosphere.api.common.operators.GenericDataSource; 082import eu.stratosphere.api.common.operators.Operator; 083import eu.stratosphere.api.common.operators.SingleInputOperator; 084import eu.stratosphere.api.java.record.functions.CoGroupFunction; 085import eu.stratosphere.api.java.record.functions.CrossFunction; 086import eu.stratosphere.api.java.record.functions.JoinFunction; 087import eu.stratosphere.api.java.record.functions.MapFunction; 088import eu.stratosphere.api.java.record.functions.ReduceFunction; 089import eu.stratosphere.api.java.record.io.CsvOutputFormat; 090import eu.stratosphere.api.java.record.io.FileOutputFormat; 091import eu.stratosphere.api.java.record.operators.CoGroupOperator; 092import eu.stratosphere.api.java.record.operators.CrossOperator; 093import eu.stratosphere.api.java.record.operators.JoinOperator; 094import eu.stratosphere.api.java.record.operators.MapOperator; 095import eu.stratosphere.api.java.record.operators.ReduceOperator; 096import eu.stratosphere.client.minicluster.NepheleMiniCluster; 097import eu.stratosphere.client.program.JobWithJars; 098import eu.stratosphere.configuration.ConfigConstants; 099import eu.stratosphere.configuration.Configuration; 100import eu.stratosphere.configuration.GlobalConfiguration; 101import eu.stratosphere.nephele.client.JobExecutionResult; 102import ptolemy.actor.CompositeActor; 103import ptolemy.actor.IOPort; 104import ptolemy.data.BooleanToken; 105import ptolemy.data.Token; 106import ptolemy.data.type.Type; 107import ptolemy.kernel.util.IllegalActionException; 108import ptolemy.kernel.util.NameDuplicationException; 109import ptolemy.kernel.util.NamedObj; 110import ptolemy.kernel.util.Workspace; 111 112/** An engine than runs workflows in Stratosphere. This engine 113 * converts DDP pattern actors (Map, Reduce, Cross, CoGroup, and 114 * Match) and I/O actors (DDPDataSink and DDPDataSource) into a 115 * Stratosphere job and runs it on the server. 116 * <p> 117 * <b>NOTE:</b> Only DDP pattern and I/O actors may be present in 118 * the workflow. Other actors must placed inside the composite 119 * pattern actors or in a different sub-workflow. 120 * </p> 121 * 122 * @author Daniel Crawl 123 * @version $Id: StratosphereEngine.java 33628 2015-08-24 22:42:20Z crawl $ 124 * 125 */ 126public class StratosphereEngine extends DDPEngine { 127 128 /** Create a new StratosphereEngine. 129 * @param director The director containing this engine. 130 */ 131 public StratosphereEngine(DDPDirector director) 132 throws IllegalActionException, NameDuplicationException { 133 super(director); 134 135 _engineName = STRATOSPHERE_ENGINE_NAME; 136 137 } 138 139 /** Clone the StratosphereEngine into the specified workspace. 140 * @param workspace The workspace for the cloned object. 141 * @exception CloneNotSupportedException Not thrown in this base class 142 * @return The new StratosphereDirector. 143 */ 144 @Override 145 public Object clone(Workspace workspace) throws CloneNotSupportedException { 146 StratosphereEngine newObject = (StratosphereEngine) super.clone(workspace); 147 newObject._globalConfig = null; 148 newObject._inputsAlreadyDone = new HashSet<Operator>(); 149 newObject._job = null; 150 return newObject; 151 } 152 153 /** Execute the engine. 154 * @return */ 155 @Override 156 protected void _executeDDPJob() throws IllegalActionException { 157 158 if(!_sameJVM && _globalConfig == null) { 159 160 GlobalConfiguration.loadConfiguration(_configDirStr); 161 _globalConfig = GlobalConfiguration.getConfiguration(); 162 163 // instantiate the address to the job manager 164 final String address = _globalConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); 165 if (address == null) { 166 throw new IllegalActionException(_director, 167 "Cannot find address to job manager's RPC service in the global configuration."); 168 } 169 170 final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); 171 if (port < 0) { 172 throw new IllegalActionException(_director, 173 "Cannot find port to job manager's RPC service in the global configuration."); 174 } 175 176 final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port); 177 178 // see if we can connect to stratosphere; start if necessary 179 // this is synchronized so that if there are multiple 180 // StratosphereDirectors in the same workflow, only one will 181 // start the stratosphere server 182 File configDirFile = new File(_configDirStr); 183 String parentDirStr = configDirFile.getParent(); 184 String startServerTypeStr = _director.startServerType.stringValue(); 185 if(startServerTypeStr.isEmpty() || startServerTypeStr.equals("default")) { 186 startServerTypeStr = DDPDirector.SAME_JVM_STRING; 187 } else if(startServerTypeStr.equals(DDPDirector.DISTRIBUTED_STRING)) { 188 startServerTypeStr = "cluster"; 189 } 190 191 String startScriptStr = parentDirStr + File.separator + "bin" + 192 File.separator + "start-" + startServerTypeStr + ".sh"; 193 194 if(_checkServer(jobManagerAddress, startScriptStr)) { 195 _startedConfigDirs.add(_configDirStr); 196 } 197 198 } 199 200 // see what jars are necessary. call this before creating the job 201 // since a new classloader be created for external classes. 202 final List<URI> jarURIs = _getJarList(); 203 204 final List<String> jarPaths = new LinkedList<String>(); 205 for (URI jarURI : jarURIs) { 206 jarPaths.add(jarURI.getPath()); 207 } 208 209 // now that types have been set, we can convert the model into a plan 210 final Plan plan = _getModelPlan(); 211 212 if(_sameJVM) { 213 214 synchronized(_localClusterLock) { 215 if(_localCluster == null) { 216 _localCluster = new NepheleMiniCluster(); 217 try { 218 System.out.println("Starting Stratosphere server in Kepler JVM."); 219 _localCluster.start(); 220 } catch (Exception e) { 221 throw new IllegalActionException(_director, e, 222 "Error starting Stratosphere server in Kepler JVM."); 223 } 224 } 225 } 226 227 System.out.println("Starting Stratosphere job."); 228 229 synchronized(_jobLock) { 230 try { 231 _job = new StratosphereJob(new JobWithJars(plan, jarPaths), _localCluster); 232 } catch (Exception e) { 233 throw new IllegalActionException(_director, e, "Error running job."); 234 } 235 } 236 237 } else { 238 239 System.out.println("Starting Stratosphere job."); 240 synchronized(_jobLock) { 241 try { 242 _job = new StratosphereJob(new JobWithJars(plan, jarPaths), _globalConfig); 243 } catch (Exception e) { 244 throw new IllegalActionException(_director, e, 245 "Error submitting Stratosphere job."); 246 } 247 } 248 } 249 250 JobExecutionResult result; 251 try { 252 result = _job.waitForFinish(); 253 } catch (Exception e) { 254 throw new IllegalActionException(_director, e, "Error in Stratosphere job."); 255 } 256 257 if(result != null) { 258 System.out.println("Finished Stratosphere job, took " + 259 result.getNetRuntime() + " ms."); 260 } else { 261 System.out.println("Stratosphere job cancelled."); 262 } 263 } 264 265 /** Initialize the engine. */ 266 @Override 267 public void preinitialize() throws IllegalActionException { 268 269 // call super class preinitialize to validate settables and 270 // preinitialize actors 271 super.preinitialize(); 272 273 _checkModel(); 274 _checkModelForReduceWithCombineClass(); 275 276 // load the default config dir if necessary 277 _checkConfigDir(); 278 279 } 280 281 /** Stop any running Stratosphere jobs. */ 282 @Override 283 public void stop() throws IllegalActionException { 284 super.stop(); 285 286 synchronized(_jobLock) { 287 if(_job != null) { 288 try { 289 _job.stop(); 290 } catch (IOException e) { 291 if(_director != null) { 292 throw new IllegalActionException(_director, e, "Error stopping DDP job."); 293 } else { 294 throw new IllegalActionException("Error stopping DDP job: " + e.getMessage()); 295 } 296 } 297 } 298 } 299 } 300 301 /** Shutdown the Stratosphere server if one was started. */ 302 public static void shutdownServer() throws IllegalActionException { 303 304 // shut down the server in this jvm if one is running. 305 synchronized(_localClusterLock) { 306 if(_localCluster != null) { 307 System.out.println("Stopping Local Stratosphere server."); 308 309 try { 310 _localCluster.stop(); 311 _localCluster = null; 312 } catch (Exception e) { 313 throw new IllegalActionException("Error shutting down local Stratosphere server: " + 314 e.getMessage()); 315 } 316 } 317 } 318 319 // shut down any servers we started. 320 synchronized(_serverStartStopLock) { 321 for(String configDirStr : _startedConfigDirs) { 322 323 System.out.println("Stopping Stratosphere server for configuration directory " + 324 configDirStr); 325 326 String parentDirStr = new File(configDirStr).getParent(); 327 String stopScriptStr = parentDirStr + File.separator + "bin" + 328 File.separator + "stop-cluster.sh"; 329 ProcessBuilder builder = new ProcessBuilder(stopScriptStr); 330 331 // make sure JAVA_HOME is set 332 java.util.Map<String,String> env = builder.environment(); 333 if(env.get("JAVA_HOME") == null) { 334 env.put("JAVA_HOME", System.getProperty("java.home")); 335 } 336 337 try { 338 Process process = builder.start(); 339 process.waitFor(); 340 } catch (Exception e) { 341 throw new IllegalActionException("Unable to stop Stratosphere: " + e.getMessage()); 342 } 343 } 344 } 345 } 346 347 /** Free resources. */ 348 @Override 349 public void wrapup() throws IllegalActionException { 350 351 super.wrapup(); 352 353 _globalConfig = null; 354 355 synchronized(_jobLock) { 356 _job = null; 357 } 358 } 359 360 /////////////////////////////////////////////////////////////////// 361 //// public fields ////// 362 363 /////////////////////////////////////////////////////////////////// 364 //// protected methods ////// 365 366 /////////////////////////////////////////////////////////////////// 367 //// private methods ////// 368 369 /** Write the sub-workflow of a SingleInputPatternActor either to a parameter 370 * in the PACT plan configuration or to a file. 371 */ 372 protected void _addSubModelToContract(SingleInputPatternActor pactor, Operator contract) throws IllegalActionException { 373 374 final String name = pactor.getName(); 375 376 final Configuration contractConfiguration = contract.getParameters(); 377 contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, name); 378 379 contractConfiguration.setBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, 380 pactor.getRunWorkflowLifecyclePerInput()); 381 382 // make a clone of the pactor so we can add TypeAttributes to ports 383 // and add parameters from the hierarchy 384 385 Workspace workspaceForClone; 386 if(_sameJVM) { 387 workspaceForClone = new Workspace(name); 388 } else { 389 workspaceForClone = _director.workspace(); 390 } 391 392 SingleInputPatternActor clonePactor; 393 try { 394 clonePactor = (SingleInputPatternActor)pactor.clone(workspaceForClone); 395 } catch (CloneNotSupportedException e) { 396 throw new IllegalActionException(_director, e, "Unable to clone " + name); 397 } 398 399 // copy the port types to the clone 400 _setPortTypes(clonePactor); 401 402 // copy the parameters to the clone 403 _copyParameters(pactor, clonePactor); 404 405 // set display redirect path 406 String directDir = pactor.getDisplayRedirectDir(); 407 NamedObj redirectSpecifier = pactor; 408 if(directDir.isEmpty()) { 409 redirectSpecifier = _director; 410 directDir = _displayRedirectDir; 411 } 412 if(!directDir.isEmpty()) { 413 414 // display redirection when running in same jvm is not supported 415 if(_sameJVM) { 416 throw new IllegalActionException(redirectSpecifier, 417 "Redirecting display actors is not supported for" 418 + " Stratosphere server running in the same JVM as Kepler."); 419 } 420 421 final File file = new File (directDir); 422 if (!file.exists() && !file.mkdirs()) { 423 throw new IllegalActionException(_director, "Could not create directories " + file); 424 } 425 if (!file.isDirectory() || !file.canWrite()) { 426 throw new IllegalActionException(redirectSpecifier, 427 "Parameter '" + _displayRedirectDir + "' must be a directory and writable."); 428 } 429 contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, directDir); 430 } 431 432 if(_sameJVM) { 433 _subWorkflows.put(name, clonePactor); 434 } else { 435 436 // remove top-level ports and relations 437 Utilities.removeModelPorts(clonePactor); 438 439 // serialize the clone pactor 440 441 if(_writeSubWorkflowsToFiles) { 442 443 if(_jobDir == null) { 444 _createJobDirectory(); 445 } 446 447 final String modelPath = _jobDir + name + ".xml"; 448 FileWriter writer = null; 449 try { 450 writer = new FileWriter(modelPath); 451 clonePactor.exportMoML(writer); 452 } catch(IOException e) { 453 throw new IllegalActionException(_director, e, "Error writing model to " + modelPath); 454 } finally { 455 if(writer != null) { 456 try { 457 writer.close(); 458 } catch (IOException e) { 459 throw new IllegalActionException(_director, e, "Error writing model to " + modelPath); 460 } 461 } 462 } 463 464 // add model file to plan configuration 465 contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, modelPath); 466 467 } else { 468 469 // write model to a string 470 final StringWriter writer = new StringWriter(); 471 try { 472 clonePactor.exportMoML(writer); 473 } catch (IOException e) { 474 throw new IllegalActionException(_director, e, "Error serializing model."); 475 } 476 477 // add string to configuration 478 contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODEL, writer.toString()); 479 } 480 481 // set the location of the kepler installation directory. 482 // NOTE: this is done so that the stub can initialize kepler and set 483 // the java properties for each module's workflow directory, e.g.: 484 // property("stratosphere.workflowdir") 485 // if the modules directory does not exist on the stub, e.g., the file 486 // system is not shared, then initialization is not done and the stub 487 // workflow cannot use the module workflow directory properties. 488 contractConfiguration.setString(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, 489 ProjectLocator.getKeplerModulesDir().getAbsolutePath()); 490 491 } 492 } 493 494 /** Recursively add inputs for a contract by traversing the model graph. 495 * 496 * @param pactor the current pactor in the model. 497 * @param contract the contract for the current pactor. 498 */ 499 private void _addInputsForContract(DDPPatternActor pactor, Operator contract) throws IllegalActionException { 500 501 // see if we've already done this contract 502 if(!_inputsAlreadyDone.contains(contract)) { 503 504 // see if the execution class name or execution code is set 505 final String executionClassName = pactor.getExecutionClassName(); 506 final String executionCodeType = pactor.getExecutionCodeType(); 507 if(executionClassName.isEmpty() && 508 executionCodeType == null && 509 (pactor instanceof SingleInputPatternActor)) { 510 // add sub-wf to plan configuration 511 _addSubModelToContract((SingleInputPatternActor)pactor, contract); 512 } 513 514 // see how many inputs are required 515 int numRequiredInputs; 516 if(contract instanceof DualInputOperator) { 517 numRequiredInputs = 2; 518 } else if((contract instanceof SingleInputOperator) || 519 (contract instanceof GenericDataSink)) { 520 numRequiredInputs = 1; 521 } else if(contract instanceof GenericDataSource) { 522 numRequiredInputs = 0; 523 } else { 524 throw new IllegalActionException(_director, "Unknown type of contract: " + contract.getClass()); 525 } 526 527 // see if there's at least one input 528 if(numRequiredInputs > 0) { 529 530 // get the first input port 531 final IOPort inputPort1 = (IOPort) pactor.getPort("in"); 532 if(inputPort1 == null) { 533 throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() + 534 "is missing input port \"in\"."); 535 } 536 537 _addInputForPactorPort(inputPort1, pactor, contract); 538 539 // see if there's a second input 540 if(numRequiredInputs > 1) { 541 542 final IOPort inputPort2 = (IOPort) pactor.getPort("in2"); 543 if(inputPort2 == null) { 544 throw new IllegalActionException(_director, "DDPPatternActor " + pactor.getFullName() + 545 "is missing input port \"in2\"."); 546 } 547 548 _addInputForPactorPort(inputPort2, pactor, contract); 549 } 550 } 551 } 552 } 553 554 /** Set the input contract for a single input port and traverse the model 555 * graph to recursively add input contracts. 556 * 557 * @param port the input port for 558 * @param pactor the current pactor in the model. 559 * @param contract the contract for the current pactor. 560 */ 561 private void _addInputForPactorPort(IOPort port, DDPPatternActor pactor, Operator contract) 562 throws IllegalActionException { 563 564 // get the connected actor 565 final List<?> outputPorts = port.sourcePortList(); 566 if(outputPorts.isEmpty()) { 567 throw new IllegalActionException(_director, "DDPPatternActor input port " + 568 port.getName() + " must be connected."); 569 } else if(outputPorts.size() > 1) { 570 throw new IllegalActionException(_director, "DDPPatternActor input port " + 571 port.getName() + " may only be connected to one port."); 572 } 573 574 final IOPort outputPort1 = (IOPort) outputPorts.get(0); 575 final NamedObj outputNamedObj = outputPort1.getContainer(); 576 577 // FIXME 578 if(!(outputNamedObj instanceof DDPPatternActor)) { 579 throw new IllegalActionException(_director, "Actor " + pactor.getFullName() + 580 " is connected to a non-DDPPatternActor: " + outputNamedObj.getFullName()); 581 } 582 583 final Operator outputContract = _getContract((DDPPatternActor)outputNamedObj); 584 585 if(contract instanceof SingleInputOperator) { 586 ((SingleInputOperator<?>)contract).setInput(outputContract); 587 } else if(contract instanceof DualInputOperator) { 588 if(port.getName().equals("in")) { 589 ((DualInputOperator<?>)contract).setFirstInput(outputContract); 590 } else if(port.getName().equals("in2")) { 591 ((DualInputOperator<?>)contract).setSecondInput(outputContract); 592 } else { 593 throw new IllegalActionException(port, "Input port must be named either \"in\" or \"in2\"."); 594 } 595 } else { // if (contract instanceof GenericDataSink)) 596 ((GenericDataSink)contract).setInput(outputContract); 597 } 598 599 _inputsAlreadyDone.add(contract); 600 601 // recursively add the inputs for output pactor 602 _addInputsForContract((DDPPatternActor)outputNamedObj, outputContract); 603 } 604 605 /** Make sure there no with Reduce pattern actors in this sub-workflow 606 * that specify a separate combiner class. 607 */ 608 private void _checkModelForReduceWithCombineClass() throws IllegalActionException { 609 610 final CompositeActor container = _container; 611 for(Reduce reduce : container.entityList(Reduce.class)) { 612 if(!reduce.combineExecutionClass.stringValue().trim().isEmpty()) { 613 throw new IllegalActionException(reduce, 614 "The combiner execution class cannot be specified with " + 615 "Stratosphere; instead implement the combine() in the" + 616 "reduce class."); 617 } 618 } 619 } 620 621 /** Get a contract for a source or sink. 622 * @param actor the Pactor 623 * @param parameterName the name of the parameter containing the class name. 624 * @param input if true, get a source contract. if false, get a sink contract. 625 */ 626 private Operator _getSourceSinkContract(AtomicPatternActor actor, boolean input) throws IllegalActionException { 627 628 String className = actor.getFormatClassName(_engineName); 629 if(className == null) { 630 throw new IllegalActionException(_director, "Could not find format class name for " + 631 " actor " + actor.getName()); 632 } 633 634 java.util.Map<String,String> parametersMap = actor.getParameters(); 635 636 // first check for classes that were in 0.1 and 0.2 WordCount 637 if(className.equals("eu.stratosphere.pact.example.wordcount.WordCount$LineInFormat")) { 638 System.out.println("class WordCount$LineInFormat no longer exists; using TextInputFormat"); 639 className = "eu.stratosphere.api.java.record.io.TextInputFormat"; 640 } else if(className.equals("eu.stratosphere.pact.common.io.TextInputFormat")) { 641 System.out.println("TextInputFormat moved to package eu.stratosphere.api.java.record.io"); 642 className = "eu.stratosphere.api.java.record.io.TextInputFormat"; 643 } else if(className.equals("eu.stratosphere.pact.example.wordcount.WordCount$WordCountOutFormat")) { 644 System.out.println("class WordCount$WordCountOutFormat no longer exists; using CsvOutputFormat"); 645 className = "eu.stratosphere.api.java.record.io.CsvOutputFormat"; 646 parametersMap.put("recordDelimiter", "\n"); 647 parametersMap.put("fieldDelimiter", " "); 648 } else if(className.equals("eu.stratosphere.pact.common.io.RecordOutputFormat")) { 649 System.out.println("class RecordOutputFormat no longer exists; using CsvOutputFormat."); 650 className = "eu.stratosphere.api.java.record.io.CsvOutputFormat"; 651 parametersMap.put("recordDelimiter", "\n"); 652 parametersMap.put("fieldDelimiter", " "); 653 } 654 655 // try to get the class 656 Class<?> clazz; 657 try { 658 clazz = _classLoader.loadClass(className); 659 } catch (ClassNotFoundException e) { 660 throw new IllegalActionException(actor, "Format type " + className + 661 " was not found in the format types configurations or is a class not on the classpath."); 662 } 663 664 Operator contract; 665 if(input) { 666 if(!InputFormat.class.isAssignableFrom(clazz)) { 667 throw new IllegalActionException(actor, "Class " + clazz.getName() + 668 " must implement Stratosphere's InputFormat interface."); 669 } 670 671 if(FileInputFormat.class.isAssignableFrom(clazz)) { 672 contract = new FileDataSource((Class<? extends FileInputFormat<?>>) clazz, 673 ((DDPDataSource) actor).getPathAsURI().toString(), actor.getName()); 674 } else if(TokenInputFormat.class.isAssignableFrom(clazz)) { 675 676 contract = new TokenDataSource((Class<? extends TokenInputFormat>)clazz, 677 actor.getName()); 678 679 if(_sameJVM) { 680 contract.setParameter(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME, 681 actor.getFullName()); 682 } else { 683 684 Token token = DDPDataSource.getToken(actor.getFullName()); 685 686 if(token == null) { 687 throw new IllegalActionException(actor, 688 "No input token found for source actor " + actor.getName()); 689 } 690 691 contract.setParameter(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, token.toString()); 692 } 693 694 } else { 695 throw new IllegalActionException(actor, "Unsupported type of format class: " + 696 clazz.getName()); 697 } 698 699 } else { 700 701 if(!OutputFormat.class.isAssignableFrom(clazz)) { 702 throw new IllegalActionException(actor, "Class " + clazz.getName() + 703 " must implement Stratosphere's OutputFormat interface."); 704 } 705 706 if(FileOutputFormat.class.isAssignableFrom(clazz)) { 707 708 contract = new FileDataSink((Class<? extends FileOutputFormat>) clazz, 709 ((DDPDataSink)actor).getPathAsURI().toString(), actor.getName()); 710 711 if(clazz == CsvOutputFormat.class) { 712 713 CsvOutputFormat.ConfigBuilder builder = 714 CsvOutputFormat.configureRecordFormat((FileDataSink) contract) 715 .lenient(true) 716 .field(TypeUtilities.getPactKeyTypeForFieldInKeyValuePort( 717 ((DDPDataSink) actor).in, "key"), TypeUtilities.KEY_FIELD) 718 .field(TypeUtilities.getPactValueTypeForFieldInKeyValuePort( 719 ((DDPDataSink) actor).in, "value"), TypeUtilities.VALUE_FIELD); 720 721 722 String field = parametersMap.get("fieldDelimiter"); 723 // if no field delimiter is set, set to space 724 if(field == null) { 725 field = " "; 726 } 727 builder = builder.fieldDelimiter(field.charAt(0)); 728 String record = parametersMap.get("recordDelimiter"); 729 if(record != null) { 730 builder = builder.recordDelimiter(record); 731 } 732 } 733 } else if(clazz == TokenOutputFormat.class) { 734 735 contract = new GenericDataSink((Class<? extends TokenOutputFormat>) clazz, actor.getName()); 736 737 contract.setParameter(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME, 738 actor.getFullName()); 739 740 TokenOutputFormat.ConfigBuilder builder = 741 TokenOutputFormat.configureRecordFormat((GenericDataSink) contract) 742 .field(TypeUtilities.getPactKeyTypeForFieldInKeyValuePort( 743 ((DDPDataSink) actor).in, "key"), TypeUtilities.KEY_FIELD) 744 .field(TypeUtilities.getPactValueTypeForFieldInKeyValuePort( 745 ((DDPDataSink) actor).in, "value"), TypeUtilities.VALUE_FIELD); 746 } else if(clazz == NullOutputFormat.class) { 747 748 contract = new GenericDataSink(new NullOutputFormat()); 749 750 } else { 751 throw new IllegalActionException(actor, "Unsupported type of format class: " + 752 clazz.getName()); 753 } 754 } 755 756 return contract; 757 758 } 759 760 /** Get the contract for a DDPPatternActor. */ 761 private Operator _getContract(DDPPatternActor actor) throws IllegalActionException { 762 763 Operator contract = null; 764 765 final String actorName = actor.getName(); 766 767 final String stubClassName = actor.getExecutionClassName(); 768 Class<?> stubClass = null; 769 770 // see if the stub was set 771 if(!stubClassName.isEmpty()) { 772 try { 773 stubClass = _classLoader.loadClass(stubClassName); 774 } catch (ClassNotFoundException e) { 775 throw new IllegalActionException(actor, e, 776 "Could not find execution class " + stubClassName); 777 } 778 } 779 780 final String stubCodeTypeName = actor.getExecutionCodeType(); 781 if(stubCodeTypeName != null) { 782 783 // TODO 784 if(!(actor instanceof Map) && !(actor instanceof Reduce)) { 785 throw new IllegalActionException(actor, "code execution not yet supported for this pattern."); 786 } 787 } 788 789 final boolean stubIsSubWorkflow = (stubClass == null) && (stubCodeTypeName == null); 790 791 if(actor instanceof DDPDataSource) { 792 contract = _getSourceSinkContract((DDPDataSource)actor, true); 793 } else if(actor instanceof Map) { 794 795 if(stubIsSubWorkflow) { 796 // execute sub-workflow as stub 797 stubClass = KeplerMapStub.class; 798 } else if(stubCodeTypeName != null) { 799 stubClass = ScriptEngineMapStub.class; 800 } else if(!MapFunction.class.isAssignableFrom(stubClass)) { 801 throw new IllegalActionException(actor, "Execution class " + 802 stubClassName + " must be a subclass of " + 803 MapFunction.class.getName()); 804 } else { 805 System.out.println("Using Map execution class " + stubClassName + 806 " for " + actor.getFullName()); 807 } 808 809 contract = MapOperator.builder((Class<? extends MapFunction>)stubClass) 810 .name(actorName) 811 .build(); 812 813 } else if (actor instanceof Reduce) { 814 815 if(stubIsSubWorkflow) { 816 // execute sub-workflow as stub 817 818 // see if reducer is also a combiner 819 if(((BooleanToken)((Reduce)actor).useAsCombiner.getToken()).booleanValue()) { 820 stubClass = KeplerCombineAndReduceStub.class; 821 } else { 822 stubClass = KeplerReduceStub.class; 823 } 824 825 } else if(stubCodeTypeName != null) { 826 stubClass = ScriptEngineReduceStub.class; 827 } else if(!ReduceFunction.class.isAssignableFrom(stubClass)) { 828 throw new IllegalActionException(actor, "Execution class " + 829 stubClassName + " must be a subclass of " + 830 ReduceFunction.class.getName()); 831 } else { 832 System.out.println("Using Reduce execution class " + stubClassName + 833 " for " + actor.getFullName()); 834 } 835 836 contract = ReduceOperator 837 .builder((Class<? extends ReduceFunction>)stubClass) 838 .keyField( 839 TypeUtilities.getPactKeyTypeForFieldInKeyValuePort( 840 ((Reduce) actor).in, "key"), 841 TypeUtilities.KEY_FIELD) 842 .name(actorName) 843 .build(); 844 845 } else if(actor instanceof Cross) { 846 847 // see if execution class was set 848 if(stubClass == null) { 849 // execute sub-workflow as stub 850 stubClass = KeplerCrossStub.class; 851 } else if(!CrossFunction.class.isAssignableFrom(stubClass)) { 852 throw new IllegalActionException(actor, "Execution class " + 853 stubClassName + " must be a subclass of " + 854 CrossFunction.class.getName()); 855 } else { 856 System.out.println("Using Cross execution class " + stubClassName + 857 " for " + actor.getFullName()); 858 } 859 860 contract = CrossOperator.builder((Class<? extends CrossFunction>)stubClass) 861 .name(actorName) 862 .build(); 863 864 } else if(actor instanceof CoGroup) { 865 866 // see if execution class was set 867 if(stubClass == null) { 868 // execute sub-workflow as stub 869 stubClass = KeplerCoGroupStub.class; 870 } else if(!CoGroupFunction.class.isAssignableFrom(stubClass)) { 871 throw new IllegalActionException(actor, "Execution class " + 872 stubClassName + " must be a subclass of " + 873 CoGroupFunction.class.getName()); 874 } else { 875 System.out.println("Using CoGroup execution class " + stubClassName + 876 " for " + actor.getFullName()); 877 } 878 879 contract = CoGroupOperator.builder((Class<? extends CoGroupFunction>) stubClass, 880 TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(((CoGroup)actor).in, "key"), 881 TypeUtilities.KEY_FIELD, TypeUtilities.KEY_FIELD) 882 .name(actorName) 883 .build(); 884 885 } else if(actor instanceof Match) { 886 887 // see if execution class was set 888 if(stubClass == null) { 889 // execute sub-workflow as stub 890 stubClass = KeplerMatchStub.class; 891 } else if(!JoinFunction.class.isAssignableFrom(stubClass)) { 892 throw new IllegalActionException(actor, "Execution class " + 893 stubClassName + " must be a subclass of " + 894 JoinFunction.class.getName()); 895 } else { 896 System.out.println("Using Match execution class " + stubClassName + 897 " for " + actor.getFullName()); 898 } 899 900 contract = JoinOperator.builder((Class<? extends JoinFunction>) stubClass, 901 TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(((Match)actor).in, "key"), 902 TypeUtilities.KEY_FIELD, TypeUtilities.KEY_FIELD) 903 .name(actorName) 904 .build(); 905 906 } else if(actor instanceof DDPDataSink) { 907 contract = _getSourceSinkContract((DDPDataSink)actor, false); 908 } else { 909 throw new IllegalActionException(_director, "Cannot determine Contract type for " + 910 actor.getFullName()); 911 } 912 913 // set the number of parallel instances. 914 int numInstances = actor.getDegreeOfParallelism(); 915 if(numInstances <= 0) { 916 numInstances = _degreeOfParallelism; 917 } 918 contract.setDegreeOfParallelism(numInstances); 919 920 // set any job arguments if an execution class is used (not a sub-workflow) 921 final Configuration configuration = contract.getParameters(); 922 if(!stubIsSubWorkflow) { 923 for(java.util.Map.Entry<String,String> entry : _jobArgumentsMap.entrySet()) { 924 // XXX this assumes they are all strings. 925 configuration.setString(entry.getKey(), entry.getValue()); 926 } 927 } 928 929 // add any actor parameters to the contract configuration 930 final java.util.Map<String,String> contractParameters = actor.getParameters(); 931 final java.util.Map<String,String> paraNames = actor.getParaImplNames(_engineName); 932 for(java.util.Map.Entry<String, String> entry : contractParameters.entrySet()) { 933 String keplerParaName = entry.getKey(); 934 if (paraNames.get(keplerParaName) != null) { 935 configuration.setString(paraNames.get(keplerParaName), entry.getValue()); 936 } else { 937 configuration.setString(Utilities.CONFIGURATION_KEPLER_PARA_PREFIX + "::" + keplerParaName, entry.getValue()); 938 } 939 } 940 //set print execution info parameter 941 configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, actor.getPrintExeInfo()); 942 943 // set the same JVM flag. 944 if(_sameJVM) { 945 configuration.setBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, true); 946 } 947 948 if(stubCodeTypeName != null) { 949 950 String scriptEngineFactoryName = Utilities.getScriptEngineFactoryName(stubCodeTypeName); 951 if(scriptEngineFactoryName == null) { 952 throw new IllegalActionException(actor, 953 "No script engine has been configured for " + stubCodeTypeName); 954 } 955 956 if(!_sameJVM) { 957 List<String> jars = Utilities.getJarsForLanguage(stubCodeTypeName); 958 _additionalJars.addAll(jars); 959 } 960 961 configuration.setString(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, scriptEngineFactoryName); 962 963 configuration.setString(Utilities.CONFIGURATION_KEPLER_STUB_CODE, actor.getExecutionCode()); 964 965 String inTypesStr = ((SingleInputPatternActor)actor).inKeyValueTypes.stringValue(); 966 Type[] types = Types.getKeyValueTypes(((SingleInputPatternActor)actor).inKeyValueTypes, inTypesStr); 967 968 configuration.setString(Utilities.CONFIGURATION_KEPLER_INPUT_KEY_TYPE, types[0].toString()); 969 configuration.setString(Utilities.CONFIGURATION_KEPLER_INPUT_VALUE_TYPE, types[1].toString()); 970 971 } 972 973 974 return contract; 975 } 976 977 /** Constructs a PACT Plan from the model. */ 978 protected Plan _getModelPlan() throws IllegalActionException { 979 980 _inputsAlreadyDone.clear(); 981 982 Plan plan = null; 983 984 // find all the sinks in the model 985 final List<DDPDataSink> sinks = _container.entityList(DDPDataSink.class); 986 987 if(sinks.isEmpty()) { 988 throw new IllegalActionException(_director, "No data sinks found."); 989 } 990 991 // for each sink, traverse the graph to the source, adding to the plan 992 for(DDPDataSink sink : sinks) { 993 994 // get the PACT contract for this sink 995 GenericDataSink contract = (GenericDataSink)_getContract(sink); 996 997 // add sink to the plan 998 if(plan == null) { 999 plan = new Plan(contract, "Kepler PACT Job " + _container.getName()); 1000 } else { 1001 plan.addDataSink(contract); 1002 } 1003 1004 // traverse graph for this sink 1005 _addInputsForContract(sink, contract); 1006 } 1007 1008 return plan; 1009 } 1010 1011 /////////////////////////////////////////////////////////////////// 1012 //// private fields ////// 1013 1014 /** Nephele global configuration. */ 1015 private Configuration _globalConfig; 1016 1017 private Set<Operator> _inputsAlreadyDone = new HashSet<Operator>(); 1018 1019 /** 1020 * A list of configuration directories for each stratosphere server that is 1021 * started. 1022 */ 1023 private final static Set<String> _startedConfigDirs = Collections 1024 .synchronizedSet(new HashSet<String>()); 1025 1026 /** Logging. */ 1027 //private final static Log _log = LogFactory.getLog(StratosphereEngine.class); 1028 1029 /** The name of the stratosphere engine. */ 1030 private final static String STRATOSPHERE_ENGINE_NAME = "Stratosphere"; 1031 1032 /** Cluster to execute stratosphere jobs in the Kepler JVM. */ 1033 private static NepheleMiniCluster _localCluster; 1034 1035 /** A lock for _localCluster. */ 1036 private final static Object _localClusterLock = new Object(); 1037 1038 /** A Stratosphere job that can be cancelled. */ 1039 private StratosphereJob _job; 1040}