001/* A base class for DDP engines. 002 * 003 * Copyright (c) 2013 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-11-23 20:26:41 +0000 (Mon, 23 Nov 2015) $' 008 * '$Revision: 34245 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.director; 031 032import java.io.File; 033import java.io.IOException; 034import java.io.InputStream; 035import java.net.InetSocketAddress; 036import java.net.MalformedURLException; 037import java.net.Socket; 038import java.net.URI; 039import java.net.URL; 040import java.net.URLClassLoader; 041import java.util.Collections; 042import java.util.HashMap; 043import java.util.HashSet; 044import java.util.LinkedList; 045import java.util.List; 046import java.util.Map; 047import java.util.Random; 048import java.util.Set; 049 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.kepler.build.modules.Module; 053import org.kepler.build.modules.ModuleTree; 054import org.kepler.configuration.ConfigurationProperty; 055import org.kepler.ddp.Utilities; 056import org.kepler.ddp.actor.pattern.DDPDataSink; 057import org.kepler.ddp.actor.pattern.DDPDataSource; 058import org.kepler.ddp.actor.pattern.DDPPatternActor; 059import org.kepler.ddp.actor.pattern.SingleInputPatternActor; 060import org.kepler.sms.SemanticType; 061 062import ptolemy.actor.CompositeActor; 063import ptolemy.actor.Executable; 064import ptolemy.actor.TypeAttribute; 065import ptolemy.actor.TypedIOPort; 066import ptolemy.actor.gui.Effigy; 067import ptolemy.actor.gui.ModelDirectory; 068import ptolemy.actor.gui.PtolemyEffigy; 069import ptolemy.actor.parameters.PortParameter; 070import ptolemy.data.BooleanToken; 071import ptolemy.data.IntToken; 072import ptolemy.data.StringToken; 073import ptolemy.data.expr.Parameter; 074import ptolemy.data.expr.Variable; 075import ptolemy.data.type.Type; 076import ptolemy.kernel.util.Attribute; 077import ptolemy.kernel.util.IllegalActionException; 078import ptolemy.kernel.util.NameDuplicationException; 079import ptolemy.kernel.util.NamedObj; 080import ptolemy.kernel.util.ScopeExtender; 081import ptolemy.kernel.util.Workspace; 082 083/** A base class for DDP engines. 084 * 085 * @author Daniel Crawl 086 * @version $Id: DDPEngine.java 34245 2015-11-23 20:26:41Z crawl $ 087 * 088 */ 089public abstract class DDPEngine implements Cloneable { 090 091 /** Create a new DDPEngine. 092 * @param director The director containing this engine. 093 */ 094 public DDPEngine(DDPDirector director) throws IllegalActionException, NameDuplicationException { 095 _director = director; 096 _addParameters(); 097 } 098 099 /** React to a parameter change. */ 100 public void attributeChanged(Attribute attribute) throws IllegalActionException { 101 102 if(attribute == _director.jobArguments) { 103 _jobArgumentsMap.clear(); 104 String val = _director.jobArguments.stringValue(); 105 if (val != null && !val.isEmpty()) { 106 for(String entry : val.split(",")) { 107 if (entry != null && !val.isEmpty()) { 108 String[] nameVal = entry.split("="); 109 if(nameVal.length != 2) { 110 throw new IllegalActionException(_director, 111 "Job arguments must be of the form: name1 = value1, name2 = value2"); 112 } 113 _jobArgumentsMap.put(nameVal[0].trim(), nameVal[1].trim()); 114 } 115 } 116 } 117 } 118 } 119 120 /** Clone the engine into the containing director's workspace. */ 121 @Override 122 public Object clone() throws CloneNotSupportedException { 123 return clone(_director.workspace()); 124 } 125 126 /** Clone the object into the specified workspace. 127 * @param workspace The workspace for the cloned object. 128 * @exception CloneNotSupportedException Not thrown in this base class 129 * @return The new Attribute. 130 */ 131 public Object clone(Workspace workspace) throws CloneNotSupportedException { 132 DDPEngine newObject = (DDPEngine) super.clone(); 133 newObject._additionalJars = new HashSet<String>(); 134 newObject._classLoader = null; 135 newObject._configDirStr = null; 136 newObject._container = null; 137 newObject._degreeOfParallelism = 1; 138 // newObject._director is set in DDPDirector.clone(). 139 newObject._displayRedirectDir = ""; 140 newObject._jobArgumentsMap = new HashMap<String,String>(); 141 newObject._jobDir = null; 142 newObject._jobLock = new Object(); 143 newObject._random = new Random(); 144 newObject._stopRequested = false; 145 newObject._writeSubWorkflowsToFiles = false; 146 return newObject; 147 } 148 149 /** Close all the effigies created. */ 150 public static void closeAllEffigies() throws IllegalActionException { 151 synchronized(_effigiesForStubModels) { 152 for(Effigy effigy : _effigiesForStubModels) { 153 _setEffigiesToNotModified(effigy); 154 effigy.closeTableaux(); 155 try { 156 effigy.setContainer(null); 157 } catch(NameDuplicationException e) { 158 throw new IllegalActionException( 159 "Error setting effigy container to null: " + 160 e.getMessage()); 161 } 162 } 163 _effigiesForStubModels.clear(); 164 } 165 } 166 167 /** Create an Effigy for a model so that windows may be opened by gui actors. */ 168 public static synchronized void createEffigy(CompositeActor model) { 169 170 List<?> configurations = ptolemy.actor.gui.Configuration.configurations(); 171 172 // make sure there is at least one Configuration. there are none when 173 // running headless. 174 if(!configurations.isEmpty()) { 175 ptolemy.actor.gui.Configuration configuration = 176 (ptolemy.actor.gui.Configuration) configurations.iterator().next(); 177 178 try { 179 180 PtolemyEffigy effigy = new PtolemyEffigy(configuration.workspace()); 181 effigy.setModel(model); 182 ModelDirectory directory = (ModelDirectory) configuration 183 .getEntity("directory"); 184 185 effigy.setName(model.getName()); 186 187 effigy.identifier.setExpression(model.getName()); 188 if (directory != null) { 189 if (directory.getEntity(model.getName()) != null) { 190 // Name is already taken. 191 int count = 2; 192 String newName = effigy.getName() + " " + count; 193 while (directory.getEntity(newName) != null) { 194 newName = effigy.getName() + " " + ++count; 195 } 196 effigy.setName(newName); 197 } 198 } 199 effigy.setContainer(directory); 200 201 // do not open a new window for the workflow 202 /*Tableau t = configuration.openModel(model);*/ 203 204 _effigiesForStubModels.add(effigy); 205 206 } catch (Exception e) { 207 throw new RuntimeException("Error creating Effigy.", e); 208 } 209 } 210 } 211 212 /** Get the directory to redirect display related actors. */ 213 public String getDisplayRedirectDir() throws IllegalActionException { 214 return _displayRedirectDir; 215 } 216 217 /** Get the model for a specific name. */ 218 public static CompositeActor getModel(String name) { 219 return _subWorkflows.get(name); 220 } 221 222 /** Get the name of the engine. */ 223 public final String getName() { 224 return _engineName; 225 } 226 227 /** Valid types of servers that can be started. The first value is the 228 * default for the startServerType parameter. 229 */ 230 public String[] getServerTypes() { 231 return new String[] {"default", DDPDirector.SAME_JVM_STRING, DDPDirector.DISTRIBUTED_STRING}; 232 } 233 234 /** Execute the engine. In this base class, does nothing. */ 235 public void fire() throws IllegalActionException { 236 237 // iterate the source actors once 238 final List<DDPDataSource> sourceActors = _container.entityList(DDPDataSource.class); 239 for(DDPDataSource source : sourceActors) { 240 final int rc = source.iterate(1); 241 if(rc == Executable.NOT_READY) { 242 throw new IllegalActionException(_director, "Actor " + source.getName() + 243 " is not ready to fire. (Maybe prefire() returned false?)"); 244 } 245 } 246 247 // call prefire for all the sink actors 248 final List<DDPDataSink> sinkActors = _container.entityList(DDPDataSink.class); 249 for(DDPDataSink sink : sinkActors) { 250 if(!sink.prefire()) { 251 throw new IllegalActionException(_director, "Actor " + sink.getName() + 252 " is not ready to fire. (Prefire() returned false.)"); 253 } 254 } 255 256 // run the DDP job 257 _executeDDPJob(); 258 259 if(!_stopRequested) { 260 // call fire and postfire for all the sink actors 261 for(Object object : sinkActors) { 262 final DDPDataSink sink = (DDPDataSink)object; 263 sink.fire(); 264 sink.postfire(); 265 } 266 } 267 } 268 269 /** Postfire the engine. In this base class, returns true. */ 270 public boolean postfire() throws IllegalActionException { 271 return true; 272 } 273 274 /** Initialize fields from parameters. */ 275 public void preinitialize() throws IllegalActionException { 276 _container = (CompositeActor) _director.getContainer(); 277 _configDirStr = _director.configDir.stringValue(); 278 _writeSubWorkflowsToFiles = ((BooleanToken)_director.writeSubWorkflowsToFiles.getToken()).booleanValue(); 279 _displayRedirectDir = _director.displayRedirectDir.stringValue(); 280 _classLoader = Thread.currentThread().getContextClassLoader(); 281 _stopRequested = false; 282 _jobDir = null; 283 284 285 IntToken parallelism = (IntToken)_director.degreeOfParallelism.getToken(); 286 if(parallelism == DDPDirector.DEFAULT_INTTOKEN) { 287 _degreeOfParallelism = 1; 288 } else { 289 _degreeOfParallelism = parallelism.intValue(); 290 } 291 292 // see if the we should use a server in the same jvm 293 String typeStr = ((StringToken)_director.startServerType.getToken()).stringValue(); 294 if(typeStr != null && (typeStr.equals("default") || 295 typeStr.equals(DDPDirector.SAME_JVM_STRING))) { 296 _sameJVM = true; 297 } else { 298 _sameJVM = false; 299 } 300 301 // close any effigies that were opened during the previous execution 302 // FIXME this closes all the effigies, not just the ones used by 303 // this engine. 304 closeAllEffigies(); 305 } 306 307 /** Stop any running DDP jobs. */ 308 public void stop() throws IllegalActionException { 309 _stopRequested = true; 310 } 311 312 /** Perform cleanup. */ 313 public void wrapup() throws IllegalActionException { 314 315 _subWorkflows.clear(); 316 317 } 318 319 /////////////////////////////////////////////////////////////////// 320 //// protected methods ////// 321 322 /** Add parameters to the containing director. In this base class, does nothing. */ 323 protected void _addParameters() throws IllegalActionException, NameDuplicationException { 324 325 } 326 327 /** Check that the configuration directory is set and exists on the file system. */ 328 protected void _checkConfigDir() throws IllegalActionException { 329 330 if(_configDirStr.trim().isEmpty()) { 331 332 String moduleName = _engineName.toLowerCase(); 333 334 // set the default location of the config directory 335 String workflowDirStr = System.getProperty(moduleName + ".workflowdir"); 336 if(workflowDirStr == null) { 337 throw new IllegalActionException(_director, "System property " + 338 moduleName + ".workflowdir not set."); 339 } 340 341 _configDirStr = workflowDirStr + File.separator + 342 "tools" + File.separator + "conf"; 343 } 344 345 // make sure conf dir exists 346 final File configDirFile = new File(_configDirStr); 347 if(!configDirFile.exists()) { 348 throw new IllegalActionException(_director, _engineName + " configuration directory " + 349 _configDirStr + " does not exist."); 350 } 351 352 } 353 354 /** Check the existence of required files before starting the DDP server. 355 * The list of required files is found in configuration.xml for each 356 * engine, in Engine.Server.Required. 357 */ 358 protected boolean _checkFilesBeforeStartingServer() throws IllegalActionException { 359 360 ConfigurationProperty engineProperty = Utilities.getEngineProperty(_engineName, _director); 361 362 List<ConfigurationProperty> requiredProperties = 363 engineProperty.getProperties("Server.Required.File"); 364 if(requiredProperties != null && !requiredProperties.isEmpty()) { 365 final File configDirFile = new File(_configDirStr); 366 final String dirStr = configDirFile.getParentFile().getAbsolutePath(); 367 for(ConfigurationProperty fileProperty : requiredProperties) { 368 File requiredFile = new File(dirStr, fileProperty.getValue()); 369 System.out.println("Check that file exists before starting the server: " + requiredFile); 370 if(!requiredFile.exists()) { 371 System.out.println("ERROR: required file not found: " + requiredFile); 372 return false; 373 } 374 } 375 } 376 return true; 377 } 378 379 /** Check the container of this director for incorrect actors, etc. */ 380 protected void _checkModel() throws IllegalActionException { 381 382 // make sure container only has ddp actors 383 _checkModelForNonDDPActors(_container); 384 385 } 386 387 /** Make sure the container only contains DDP actors. */ 388 protected void _checkModelForNonDDPActors(CompositeActor container) 389 throws IllegalActionException { 390 391 // verify all contained actors are ddp pattern actors 392 for(Object object : container.entityList()) { 393 if(!(object instanceof DDPPatternActor)) { 394 throw new IllegalActionException(_director, ((NamedObj)object).getName() + 395 " is not a DDP Pattern actor."); 396 } else { 397 // make sure composites have a director 398 // NOTE: composites must have a director even if the execution 399 // class is specified (and the sub-workflow is not used) since 400 // the output port type can only be set if the composite actor 401 // is opaque. 402 // @see TypedIOPort.getType() 403 // 404 DDPPatternActor actor = (DDPPatternActor)object; 405 if(!actor.isOpaque()) { 406 throw new IllegalActionException(_director, 407 ((NamedObj)object).getName() + " must contain a director."); 408 } 409 } 410 } 411 } 412 413 /** Check if the DDP engine server is running. If not, try to start it. 414 * @param socketAddress Host and port of the server to check. 415 * @param startScriptStr The script to start the server if not running. 416 * @return True if a server was started, false if could connect to already running server. 417 */ 418 protected boolean _checkServer(InetSocketAddress socketAddress, String startScriptStr) 419 throws IllegalActionException { 420 421 boolean startedServer = false; 422 423 synchronized(_serverStartStopLock) { 424 Socket socket = null; 425 try { 426 socket = new Socket(); 427 boolean connected = false; 428 try { 429 socket.connect(socketAddress, _CONNECT_TIMEOUT); 430 connected = true; 431 } catch (IOException e) { 432 433 System.out.println(_engineName + " server " + socketAddress + 434 " does not appear to be running. Starting..."); 435 436 // start the server 437 438 if(!_checkFilesBeforeStartingServer()) { 439 throw new IllegalActionException(_director, 440 "One or more files required to start the server were not found."); 441 } 442 443 // see if the script is executable. kepler modules are zipped, 444 // which does not preserve the permissions. 445 File startScriptFile = new File(startScriptStr); 446 if(!startScriptFile.canExecute()) { 447 throw new IllegalActionException(_director, 448 "The script " + startScriptFile + " is not executable.\n" + 449 "You must change the permissions so that " + 450 startScriptFile.getName() + 451 " and all the other scripts in \n" + 452 startScriptFile.getParent() + " are executable."); 453 } 454 455 ProcessBuilder builder = new ProcessBuilder(startScriptStr); 456 457 // make sure JAVA_HOME is set 458 java.util.Map<String,String> env = builder.environment(); 459 if(env.get("JAVA_HOME") == null) { 460 env.put("JAVA_HOME", System.getProperty("java.home")); 461 } 462 463 builder.redirectErrorStream(true); 464 465 try { 466 Process process = builder.start(); 467 InetSocketAddress newAddress = 468 _parseOutputFromStartingServer(process.getInputStream()); 469 if(newAddress != null) { 470 socketAddress = newAddress; 471 } 472 process.waitFor(); 473 startedServer = true; 474 } catch (Exception e1) { 475 throw new IllegalActionException(_director, e1, "Unable to start " + 476 _engineName + " server."); 477 } 478 479 int tries = 0; 480 while(tries < 5) { 481 // wait for the server to start 482 try { 483 Thread.sleep(5000); 484 tries++; 485 System.out.print("Connecting to " + _engineName + " server port try #" + tries + ": "); 486 try { 487 socket.close(); 488 socket = new Socket(); 489 socket.connect(socketAddress, _CONNECT_TIMEOUT); 490 connected = true; 491 System.out.println("connected."); 492 break; 493 } catch (IOException e1) { 494 // do nothing 495 System.out.println(e1); 496 } 497 } catch (InterruptedException e2) { 498 throw new IllegalActionException(_director, e2, "Error while sleeping."); 499 } 500 } 501 502 503 // if we get here, we were able to connect to the master/job manager port. 504 // however, the server may not be completely initialized, so wait a few more seconds 505 System.out.println("Waiting 15 seconds for " + _engineName + " server to initialize."); 506 try { 507 Thread.sleep(15000); 508 } catch (InterruptedException e2) { 509 throw new IllegalActionException(_director, e2, "Error while waiting " + 510 " for " + _engineName + " server to initialize."); 511 } 512 513 } 514 515 if(connected) { 516 try { 517 socket.close(); 518 socket = null; 519 } catch (IOException e) { 520 throw new IllegalActionException(_director, e, "Error closing socket."); 521 } 522 } else { 523 throw new IllegalActionException(_director, 524 "Could not connect to " + _engineName + " server: " + socketAddress); 525 } 526 } finally { 527 if(socket != null) { 528 try { 529 socket.close(); 530 } catch (IOException e) { 531 throw new IllegalActionException(_director, e, "Error closing socket."); 532 } 533 } 534 } 535 } 536 537 return startedServer; 538 } 539 540 /** Copy the workflow parameters from one sub-workflow to another including 541 * the parameters in all the containers of the source. 542 * 543 * @param sourceSubWorkflow the source sub-workflow 544 * @param destSubWorkflow the destination sub-workflow 545 */ 546 protected void _copyParameters(DDPPatternActor sourceSubWorkflow, DDPPatternActor destSubWorkflow) 547 throws IllegalActionException { 548 549 // clone the parameters into the same workspace as the destination 550 // subworkflow 551 final Workspace workspace = ((NamedObj) destSubWorkflow).workspace(); 552 553 // get the parameters up the hierarchy 554 final java.util.Map<String,Variable> parameters = 555 _getParametersInHierarchy(sourceSubWorkflow.getContainer()); 556 // copy the parameters into the pactor 557 // TODO: only need to get the parameters once per run, instead of once 558 // per pactor 559 for(Variable p : parameters.values()) { 560 561 //System.out.println("copying parameter: " + p.getFullName()); 562 563 // make sure the cloned actor does not already have a parameter 564 // with the same name. this can happen if the parameter is a SharedParameter 565 if(destSubWorkflow.getAttribute(p.getName()) == null) { 566 //System.out.println("parameter " + p); 567 try { 568 // if the parameter is a PortParameter, create a new parameter 569 // instead of clone it, since we do not want the associated port 570 571 // we also need to set persistence so the parameter appears when serialized 572 if(p instanceof PortParameter) { 573 final Parameter copiedParameter = new Parameter((NamedObj) destSubWorkflow, p.getName()); 574 String value = p.getExpression(); 575 if(p.isStringMode()) { 576 copiedParameter.setExpression("\"" + value + "\""); 577 } else { 578 copiedParameter.setExpression(value); 579 } 580 copiedParameter.setPersistent(true); 581 } else { 582 final Variable cloneParameter; 583 584 // NOTE: Variable values are not written during exportMoML(), 585 // so we need to put the value in a new Parameter. 586 if(p instanceof Parameter) { 587 cloneParameter = (Variable) p.clone(workspace); 588 } else { 589 cloneParameter = new Parameter(workspace); 590 } 591 cloneParameter.setContainer((NamedObj) destSubWorkflow); 592 cloneParameter.setPersistent(true); 593 594 if(!(p instanceof Parameter)) { 595 cloneParameter.setName(p.getName()); 596 ((Parameter)cloneParameter).setExpression(p.getExpression()); 597 } 598 } 599 } catch(Exception e) { 600 throw new IllegalActionException(_director, e, "Unable to add " + 601 " parameter " + p.getFullName() + " to " + sourceSubWorkflow.getFullName()); 602 } 603 } 604 } 605 } 606 607 /** Create a new directory for this job. */ 608 protected void _createJobDirectory() throws IllegalActionException { 609 610 int number = _random.nextInt(Integer.MAX_VALUE); 611 File directory = new File(System.getProperty("user.home") 612 + File.separator + number); 613 while (directory.exists()) { 614 number = _random.nextInt(Integer.MAX_VALUE); 615 directory = new File(System.getProperty("user.home") 616 + File.separator + number); 617 } 618 if (!directory.mkdir()) { 619 throw new IllegalActionException("Could not create directory " 620 + directory.getPath()); 621 } 622 _jobDir = directory.getPath() + File.separator; 623 _log.debug("Job directory is " + _jobDir); 624 } 625 626 /** Get the parameters for a NamedObj and all its containers. */ 627 protected static Map<String,Variable> _getParametersInHierarchy(NamedObj namedObj) 628 { 629 java.util.Map<String,Variable> retval = new HashMap<String,Variable>(); 630 final List<?> attributes = namedObj.attributeList(); 631 for(Object object : attributes) { 632 if(object instanceof Variable) { 633 retval.put(((Variable)object).getName(), (Variable)object); 634 } 635 636 637 if(object instanceof ScopeExtender) { 638 try { 639 ((ScopeExtender)object).expand(); 640 } catch (IllegalActionException e) { 641 // TODO Auto-generated catch block 642 e.printStackTrace(); 643 } 644 for(Object subAttribute : ((ScopeExtender)object).attributeList()) { 645 if(subAttribute instanceof Variable) { 646 retval.put(((Variable)subAttribute).getName(), (Variable)subAttribute); 647 } 648 } 649 } 650 } 651 652 // get the parameters above 653 final NamedObj container = namedObj.getContainer(); 654 if(container != null) { 655 final java.util.Map<String,Variable> aboveParameters = _getParametersInHierarchy(container); 656 for(java.util.Map.Entry<String,Variable> entry : aboveParameters.entrySet()) { 657 final String name = entry.getKey(); 658 // do not add parameters with the same name since they are overridden 659 if(!retval.containsKey(name)) { 660 retval.put(name, entry.getValue()); 661 } 662 } 663 } 664 665 // remove certain parameters 666 java.util.Map<String,Variable> copy = new HashMap<String,Variable>(retval); 667 for(java.util.Map.Entry<String,Variable> entry : copy.entrySet()) { 668 final String name = entry.getKey(); 669 final Variable parameter = entry.getValue(); 670 // remove parameters whose name begins with "_" 671 // remove semantic type parameters 672 if(name.startsWith("_") || (parameter instanceof SemanticType)) { 673 retval.remove(name); 674 } 675 } 676 677 return retval; 678 } 679 680 /** Get a list of jars required for director to start. 681 * It also set _classLoader value based on the jars. 682 */ 683 protected List<URI> _getJarList() throws IllegalActionException{ 684 final List<URI> jarPaths = new LinkedList<URI>(); 685 final List<File> jarsWithRelativePaths = new LinkedList<File>(); 686 687 for(String additionalJar : _additionalJars) { 688 jarsWithRelativePaths.add(new File(additionalJar)); 689 } 690 691 // get the jars in the director's includeJars parameter 692 String includeJarsStr = _director.includeJars.stringValue(); 693 if(includeJarsStr != null && !includeJarsStr.isEmpty()) { 694 for(String jarPath : includeJarsStr.split(",")) { 695 File jarFile = new File(jarPath); 696 // see if jar is an absolute path 697 if(jarFile.isAbsolute()) { 698 if(!jarFile.exists() || !jarFile.canRead()) { 699 throw new IllegalActionException(_director, 700 "Jar does not exist or cannot be read: " + jarFile); 701 } 702 // jars with absolute paths are added directly 703 System.out.println("Adding jar: " + jarFile.getAbsolutePath()); 704 jarPaths.add(jarFile.toURI()); 705 } else { 706 jarsWithRelativePaths.add(jarFile); 707 } 708 } 709 } 710 711 // add the module jars, e.g., actors.jar, ptolemy.jar, etc. 712 // also add any jars in includeJars with a relative path - these jars 713 // are assumed to be module/lib/jar. 714 final ModuleTree moduleTree = ModuleTree.instance(); 715 for(Module module : moduleTree) { 716 717 final File moduleJar = module.getTargetJar(); 718 719 // add the module jar if it exists. 720 // some modules, e.g., outreach, do not have jars. 721 if(moduleJar.exists()) { 722 jarPaths.add(moduleJar.toURI()); 723 } 724 725 final List<File> moduleJars = module.getJars(); 726 for(File jar : moduleJars) { 727 // include kepler-tasks.jar since we need classes 728 // in org.kepler.build to initialize kepler in the 729 // stub. see StubUtilities.initializeKepler() 730 if(jar.getName().equals("kepler-tasks.jar")) { 731 //System.out.println("adding jar " + jar); 732 jarPaths.add(jar.toURI()); 733 } else if(jar.getName().matches("^log4j.*jar$") || //add log4j jar since it is used for display-redirect function in DDP. 734 jar.getName().equals("ant.jar")) { 735 jarPaths.add(jar.toURI()); 736 } else if(!jarsWithRelativePaths.isEmpty()) { 737 for(File jarFile : jarsWithRelativePaths) { 738 if(jar.getName().equals(jarFile.getName())) { 739 System.out.println("Adding jar in module " + module.getName() + 740 ": " + jar); 741 jarPaths.add(jar.toURI()); 742 } 743 } 744 } 745 } 746 } 747 748 // add any jars specified by the actors. 749 final List<DDPPatternActor> actors = _container.entityList(DDPPatternActor.class); 750 for(DDPPatternActor actor : actors) { 751 final String jarsStr = actor.getJars(); 752 if(!jarsStr.isEmpty()) { 753 final String[] jars = jarsStr.split(","); 754 for(String jar : jars) { 755 final File jarFile = new File(jar); 756 if(!jarFile.exists() || !jarFile.canRead()) { 757 throw new IllegalActionException(actor, 758 "Jar does not exist or cannot be read: " + jarFile.getAbsolutePath()); 759 } 760 System.out.println("Adding jar for " + actor.getFullName() + ": " + 761 jarFile.getAbsolutePath()); 762 jarPaths.add(jarFile.toURI()); 763 } 764 } 765 } 766 767 URL[] jarArray; 768 try{ 769 List<URL> jarURLs = new LinkedList<URL>(); 770 for (URI jarURI : jarPaths) { 771 jarURLs.add(jarURI.toURL()); 772 } 773 774 jarArray = jarURLs.toArray(new URL[jarURLs.size()]); 775 776 if (jarArray != null && jarArray.length > 0) { 777 _classLoader = new URLClassLoader(jarArray, Thread.currentThread() 778 .getContextClassLoader()); 779 Thread.currentThread().setContextClassLoader(_classLoader); 780 } 781 782 } catch (MalformedURLException e) { 783 e.printStackTrace(); 784 throw new IllegalActionException(_director, e.getMessage()); 785 } 786 787 return jarPaths; 788 } 789 790 /** Execute the DDP job. */ 791 protected abstract void _executeDDPJob() throws IllegalActionException; 792 793 /** Parse the output from the script that starts the server. In this 794 * class, does nothing and returns null. 795 * @return If the start script specifies a server URL, returns the 796 * socket address for that URL. Otherwise, returns null. 797 */ 798 protected InetSocketAddress _parseOutputFromStartingServer(InputStream input) throws IOException, IllegalActionException { 799 return null; 800 } 801 802 /** Remove engine-specific parameters from the director. 803 * Does nothing in this base class. 804 */ 805 protected void _removeParameters() throws IllegalActionException, NameDuplicationException { 806 807 } 808 809 /** Set the port types inside a cloned pattern actor. 810 * @param actor the cloned actor 811 */ 812 protected Map<String, Type> _setPortTypes(DDPPatternActor actor) 813 throws IllegalActionException { 814 815 final Map<String,Type> typeMap = new HashMap<String,Type>(); 816 817 // set the input ports of the stub source actors. the output ports will be 818 // set by the actor's PortFunction class, see _createPortFunction(). 819 // the stub sink actor ports do not need to be set. 820 final List<?> pactorPorts = actor.inputPortList(); 821 for(Object object : pactorPorts) { 822 final TypedIOPort pactorPort = (TypedIOPort) object; 823 824 // get the connected ports - the input ports of the source actors 825 final List<?> connectedPorts = pactorPort.insidePortList(); 826 for(Object object2 : connectedPorts) { 827 final TypedIOPort connectedPort = (TypedIOPort) object2; 828 829 // set the types of ports connected to the pactor port so their 830 // types can be used to set the pact types in the stub 831 TypeAttribute typeAttribute; 832 try { 833 typeAttribute = new TypeAttribute(connectedPort, connectedPort.getName() + "Type"); 834 } catch(NameDuplicationException e) { 835 throw new IllegalActionException(_director, e, 836 "Error creating type attribute for " + connectedPort.getFullName()); 837 } 838 typeMap.put(connectedPort.getName(), connectedPort.getType()); 839 typeAttribute.setExpression(connectedPort.getType().toString()); 840 841 // call TypedIOPort.attributeChanged() with the type attribute so that 842 // the port type will be set. this is necessary if the cloned subworkflow 843 // is not serialized. 844 connectedPort.attributeChanged(typeAttribute); 845 } 846 } 847 return typeMap; 848 } 849 850 /////////////////////////////////////////////////////////////////// 851 //// protected fields ////// 852 853 /** DDP Engine configuration directory. */ 854 protected String _configDirStr; 855 856 /** Directory for the current job. */ 857 protected String _jobDir; 858 859 /** The ClassLoader class for loading implementation class for DDP actors. */ 860 protected ClassLoader _classLoader; 861 862 /** Timeout when seeing if server is running. */ 863 protected final static int _CONNECT_TIMEOUT = 5*1000; 864 865 /** Directory for display redirect. */ 866 protected String _displayRedirectDir; 867 868 /** If true, write the sub-workflows to files, otherwise pass them as 869 * to in the configuration object. 870 */ 871 protected boolean _writeSubWorkflowsToFiles = false; 872 873 /** The container of this director. */ 874 protected CompositeActor _container; 875 876 /** The default degree of parallelism for ddp pattern actors. */ 877 protected int _degreeOfParallelism = 1; 878 879 /** A collection of job arguments as key-values. */ 880 protected Map<String,String> _jobArgumentsMap = new HashMap<String,String>(); 881 882 /** The containing director. */ 883 protected DDPDirector _director; 884 885 /** The name of the engine. */ 886 protected String _engineName = "unknown"; 887 888 /** A mapping of model name to model. FIXME: change to full name */ 889 protected static final java.util.Map<String,SingleInputPatternActor> _subWorkflows = 890 Collections.synchronizedMap(new HashMap<String,SingleInputPatternActor>()); 891 892 /** If true, run and use server in the same JVM as Kepler. */ 893 protected boolean _sameJVM = true; 894 895 /** A set of additional jar names to send to the server. */ 896 protected Set<String> _additionalJars = new HashSet<String>(); 897 898 /** A lock for DDP jobs. */ 899 protected Object _jobLock = new Object(); 900 901 /** A static object used for synchronization. */ 902 protected final static Object _serverStartStopLock = new Object(); 903 904 905 /////////////////////////////////////////////////////////////////// 906 //// private methods ////// 907 908 /** Set an effigy and any contained effigies to be not modified. */ 909 private static void _setEffigiesToNotModified(Effigy effigy) { 910 //System.out.println("setting not modified for : " + effigy.getFullName()); 911 effigy.setModified(false); 912 for(Effigy containedEffigy : effigy.entityList(Effigy.class)) { 913 _setEffigiesToNotModified(containedEffigy); 914 } 915 } 916 917 /////////////////////////////////////////////////////////////////// 918 //// private fields ////// 919 920 /** Random number generator for job directories. */ 921 private Random _random = new Random(); 922 923 /** Logging. */ 924 private final static Log _log = LogFactory.getLog(DDPEngine.class); 925 926 /** A list of Effigies opened for models. */ 927 private static List<Effigy> _effigiesForStubModels = 928 Collections.synchronizedList(new LinkedList<Effigy>()); 929 930 /** If true, the user requested the workflow to stop. */ 931 private boolean _stopRequested = false; 932}