001/* 002 * Copyright (c) 2015 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2017-09-04 12:59:35 -0700 (Mon, 04 Sep 2017) $' 007 * '$Revision: 1407 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.webview.server; 030 031import java.io.File; 032import java.io.FileWriter; 033import java.io.IOException; 034import java.net.HttpURLConnection; 035import java.text.DateFormat; 036import java.text.SimpleDateFormat; 037import java.util.Collections; 038import java.util.Date; 039import java.util.HashMap; 040import java.util.Iterator; 041import java.util.LinkedList; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.concurrent.LinkedBlockingQueue; 046import java.util.stream.Collectors; 047 048import org.apache.commons.io.FilenameUtils; 049import org.kepler.build.modules.Module; 050import org.kepler.build.modules.ModuleTree; 051import org.kepler.gui.KeplerGraphFrame; 052import org.kepler.gui.KeplerGraphFrame.Components; 053import org.kepler.gui.KeplerGraphFrameUpdater; 054import org.kepler.loader.util.ParseWorkflow; 055import org.kepler.module.webview.Shutdown; 056import org.kepler.objectmanager.lsid.KeplerLSID; 057import org.kepler.provenance.ProvenanceRecorder; 058import org.kepler.provenance.Queryable; 059import org.kepler.provenance.Recording; 060import org.kepler.util.ParseWorkflowUtil; 061import org.kepler.webview.actor.ControlAttribute; 062import org.kepler.webview.actor.ParametersAttribute; 063import org.kepler.webview.server.app.App; 064import org.kepler.webview.server.auth.AuthUtilities; 065import org.kepler.webview.server.auth.WebViewAuthHandlerImpl; 066import org.kepler.webview.server.handler.ActorHandler; 067import org.kepler.webview.server.handler.LoginHandler; 068import org.kepler.webview.server.handler.NoMatchHandler; 069import org.kepler.webview.server.handler.RunIdHandler; 070import org.kepler.webview.server.handler.RunWorkflowHandler; 071import org.kepler.webview.server.handler.RunsHandler; 072import org.kepler.webview.server.handler.TableOfContentsHandler; 073import org.kepler.webview.server.handler.WorkflowHandler; 074import org.kepler.webview.server.handler.WorkflowWebSocketHandler; 075 076import com.hazelcast.config.Config; 077 078import io.vertx.core.AbstractVerticle; 079import io.vertx.core.AsyncResult; 080import io.vertx.core.DeploymentOptions; 081import io.vertx.core.Future; 082import io.vertx.core.Handler; 083import io.vertx.core.Vertx; 084import io.vertx.core.VertxOptions; 085import io.vertx.core.buffer.Buffer; 086import io.vertx.core.http.HttpMethod; 087import io.vertx.core.http.HttpServer; 088import io.vertx.core.http.HttpServerOptions; 089import io.vertx.core.http.HttpServerRequest; 090import io.vertx.core.http.ServerWebSocket; 091import io.vertx.core.json.JsonArray; 092import io.vertx.core.json.JsonObject; 093import io.vertx.core.net.PemKeyCertOptions; 094import io.vertx.core.net.SocketAddress; 095import io.vertx.core.spi.cluster.ClusterManager; 096import io.vertx.ext.auth.AuthProvider; 097import io.vertx.ext.auth.User; 098import io.vertx.ext.web.Router; 099import io.vertx.ext.web.RoutingContext; 100import io.vertx.ext.web.handler.AuthHandler; 101import io.vertx.ext.web.handler.BodyHandler; 102import io.vertx.ext.web.handler.CookieHandler; 103import io.vertx.ext.web.handler.CorsHandler; 104import io.vertx.ext.web.handler.SessionHandler; 105import io.vertx.ext.web.handler.UserSessionHandler; 106import io.vertx.ext.web.sstore.ClusteredSessionStore; 107import io.vertx.ext.web.sstore.LocalSessionStore; 108import io.vertx.ext.web.sstore.SessionStore; 109import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; 110import ptolemy.actor.CompositeActor; 111import ptolemy.actor.ExecutionListener; 112import ptolemy.actor.Manager; 113import ptolemy.actor.gui.ConfigurationApplication; 114import ptolemy.kernel.util.Attribute; 115import ptolemy.kernel.util.IllegalActionException; 116import ptolemy.kernel.util.NamedObj; 117import ptolemy.kernel.util.Settable; 118import ptolemy.kernel.util.Workspace; 119import ptolemy.moml.MoMLParser; 120import ptolemy.moml.filter.BackwardCompatibility; 121import ptolemy.util.MessageHandler; 122 123public class WebViewServer extends AbstractVerticle { 124 125 /** Execute an app. 126 * @param json The JSON object with the workflow name and any parameters. 127 * @param handler Asynchronous handler to receive the a JSON object with any results. 128 */ 129 public void executeApp(JsonObject json, User user, 130 Handler<AsyncResult<JsonObject>> handler) { 131 132 String appName = json.getString("app_name"); 133 //System.out.println("execute workflow " + wfName); 134 135 if(appName == null || appName.trim().isEmpty()) { 136 handler.handle(Future.failedFuture("No app_name specified.")); 137 return; 138 } 139 140 if(user == null) { 141 handler.handle(Future.failedFuture("User is not authenticated")); 142 return; 143 } 144 145 for(String key: json.fieldNames()) { 146 if(!key.equals("app_name") && 147 !key.equals("app_param") && 148 !key.equals("prov") && 149 !key.equals("reqid") && 150 !key.equals("sync")) { 151 System.err.println("WARNING: unknown property in request: " + key); 152 } 153 } 154 155 // the following will block, e.g., waiting on the workspace lock, 156 // so execute in blocking threads. 157 _vertx.<JsonObject>executeBlocking(future -> { 158 159 //System.out.println("executeBlocking " + wfName); 160 161 boolean runSynchronously = false; 162 App app = null; 163 boolean running = false; 164 165 // see if app is already loaded 166 try { 167 app = _getApp(appName); 168 169 if(app == null) { 170 throw new Exception("App not found."); 171 } 172 173 JsonObject appParams; 174 if(json.containsKey("app_param")) { 175 try { 176 appParams = json.getJsonObject("app_param"); 177 } catch(ClassCastException e) { 178 throw new Exception("app_param must be a json object."); 179 } 180 } else { 181 appParams = new JsonObject(); 182 } 183 184 runSynchronously = json.getBoolean("sync", false); 185 if(!runSynchronously) { 186 throw new Exception("Asynchronous execution not supported for apps."); 187 } 188 189 if(runSynchronously) { 190 // execute the application 191 running = true; 192 final App appCopy = app; 193 app.exec(user, appParams, appHandler -> { 194 195 if(appHandler.failed()) { 196 future.fail(appHandler.cause()); 197 } else { 198 future.complete(new JsonObject().put("responses", appHandler.result())); 199 } 200 appCopy.close(); 201 }); 202 } 203 } catch (Exception e) { 204 e.printStackTrace(); 205 future.fail("Error executing app: " + e.getMessage()); 206 return; 207 } finally { 208 if(!running) { 209 app.close(); 210 } 211 } 212 213 // set ordered to false to allow parallel executions, 214 // and use handler for result. 215 }, false, handler); 216 } 217 218 /** Execute a workflow. 219 * @param json The JSON object with the workflow name and any parameters. 220 * @param handler Asynchronous handler to receive the a JSON object with any results. 221 */ 222 public void executeWorkflow(JsonObject json, User user, 223 Handler<AsyncResult<JsonObject>> handler) { 224 225 String wfName = json.getString("wf_name"); 226 //System.out.println("execute workflow " + wfName); 227 228 if(wfName == null || wfName.trim().isEmpty()) { 229 handler.handle(Future.failedFuture("No wf_name specified.")); 230 return; 231 } 232 233 if(user == null) { 234 handler.handle(Future.failedFuture("User is not authenticated")); 235 return; 236 } 237 238 // TODO correct way to convert Buffer to JSON? 239 //System.out.println("recvd: " + json); 240 241 // the following will block, e.g., waiting on the workspace lock, 242 // so execute in blocking threads. 243 _vertx.<JsonObject>executeBlocking(future -> { 244 245 //System.out.println("executeBlocking " + wfName); 246 247 boolean runSynchronously = false; 248 ProvenanceRecorder recorder = null; 249 CompositeActor model = null; 250 251 // see if workflow is already loaded 252 try { 253 model = _getModel(wfName); 254 255 if(model == null) { 256 throw new Exception("Workflow not found."); 257 } 258 259 try { 260 _setModelParameters(model, json, user); 261 } catch (IllegalActionException e) { 262 throw new Exception("Error setting parameters: " + e.getMessage()); 263 } 264 265 boolean recordProvenance = json.getBoolean("prov", true); 266 runSynchronously = json.getBoolean("sync", false); 267 268 // cannot run async without provenance 269 if(!runSynchronously && !recordProvenance) { 270 throw new Exception("Cannot execute workflow asynchronously without recording provenance."); 271 } 272 273 recorder = ProvenanceRecorder.getDefaultProvenanceRecorder(model); 274 275 if(recordProvenance) { 276 if(recorder == null) { 277 if(!ProvenanceRecorder.addProvenanceRecorder(model, null, null)) { 278 throw new Exception("Error adding provenance recorder to workflow."); 279 } 280 recorder = ProvenanceRecorder.getDefaultProvenanceRecorder(model); 281 if(recorder == null) { 282 throw new Exception("Cannot find provenance recorder in workflow after adding one."); 283 } 284 } 285 286 //System.out.println("setting provenance username = " + user.principal().getString("username")); 287 recorder.username.setToken(user.principal().getString("username")); 288 } else if (recorder != null) { 289 recorder.setContainer(null); 290 } 291 292 // create manager and add model 293 final Manager manager = new Manager(model.workspace(), "Manager"); 294 295 try { 296 model.setManager(manager); 297 } catch(IllegalActionException e) { 298 throw new Exception("Error setting Manager for sub-workflow: " + e.getMessage()); 299 } 300 301 String[] errorMessage = new String[1]; 302 303 ExecutionListener managerListener = new ExecutionListener() { 304 305 @Override 306 public void executionError(Manager m, Throwable throwable) { 307 System.err.println("Workflow execution error: " + throwable.getMessage()); 308 throwable.printStackTrace(); 309 errorMessage[0] = throwable.getMessage(); 310 } 311 312 @Override 313 public void executionFinished(Manager m) { 314 //System.out.println("finished"); 315 } 316 317 @Override 318 public void managerStateChanged(Manager m) { 319 //System.out.println("manager state changed " + m.getState().getDescription()); 320 } 321 }; 322 323 manager.addExecutionListener(managerListener); 324 325 if(runSynchronously) { 326 327 final String[] runLSIDStr = new String[1]; 328 if(recordProvenance) { 329 recorder.addPiggyback(new Recording() { 330 @Override 331 public void executionStart(KeplerLSID lsid) { 332 runLSIDStr[0] = lsid.toString(); 333 } 334 }); 335 } 336 337 final boolean[] timeout = new boolean[1]; 338 timeout[0] = false; 339 long timerId = vertx.setTimer(_workflowTimeout, id -> { 340 timeout[0] = true; 341 manager.stop(); 342 future.fail("Execution timeout."); 343 }); 344 345 346 // call execute() instead of run, otherwise exceptions 347 // go to execution listener asynchronously. 348 manager.execute(); 349 350 // see if we timed-out. if so, we already sent the response, 351 // so exit. 352 if(timeout[0]) { 353 return; 354 } else if(!vertx.cancelTimer(timerId)) { 355 System.err.println("Workflow timeout Timer does not exist."); 356 } else { System.out.println("cancelled timer."); } 357 358 // see if there is a workflow exception 359 if(errorMessage[0] != null) { 360 future.fail(errorMessage[0]); 361 return; 362 } 363 364 JsonObject responseJson = new JsonObject(); 365 366 if(recordProvenance) { 367 responseJson.put("id", runLSIDStr[0]); 368 } 369 370 // build the response JSON object from the client buffers 371 // for this workflow. 372 JsonArray arrayJson = new JsonArray(); 373 List<JsonObject> outputs = getClientBuffer(model); 374 if(outputs != null) { 375 for(JsonObject outputJson : outputs) { 376 if(outputJson.containsKey("actor")) { 377 JsonObject actorObject = outputJson.getJsonObject("actor"); 378 for(String idStr : actorObject.fieldNames()) { 379 JsonObject idObject = actorObject.getJsonObject(idStr); 380 if(idObject.containsKey("data")) { 381 arrayJson.add(idObject.getJsonObject("data")); 382 } 383 } 384 } 385 } 386 } 387 // send the successful response 388 //System.out.println(arrayJson.encodePrettily()); 389 responseJson.put("responses", arrayJson); 390 future.complete(responseJson); 391 } else { // asynchronous 392 393 final CompositeActor finalModel = model; 394 recorder.addPiggyback(new Recording() { 395 @Override 396 public void executionStart(KeplerLSID lsid) { 397 //System.out.println("execution lsid is " + lsid); 398 future.complete(new JsonObject().put("id", lsid.toString())); 399 } 400 401 @Override 402 public void executionStop() { 403 removeModel(finalModel); 404 } 405 }); 406 407 manager.startRun(); 408 } 409 } catch (Exception e) { 410 future.fail("Error executing workflow: " + e.getMessage()); 411 return; 412 } finally { 413 if(model != null && runSynchronously) { 414 removeModel(model); 415 model = null; 416 } 417 } 418 419 // set ordered to false to allow parallel executions, 420 // and use handler for result. 421 }, false, handler); 422 } 423 424 /** Search for a file. The directories searched are the root directory (if specified), 425 * and <module>/resources/html/. 426 * @param name The name of the file to search for. 427 * @return If the path is found, the absolute path of the file. 428 * Otherwise, returns null. 429 */ 430 public static String findFile(String name) { 431 432 //System.out.println("findFile: " + name); 433 434 if(_appendIndexHtml && name.endsWith("/")) { 435 name = name.concat("index.html"); 436 } 437 438 // search root dir first if specified 439 if(_rootDir != null) { 440 String fsPath = new StringBuilder(_rootDir) 441 .append(File.separator) 442 .append(name) 443 .toString(); 444 // TODO check path is not outside _rootDir 445 if(_vertx.fileSystem().existsBlocking(fsPath)) { 446 return fsPath; 447 } 448 } 449 450 // not found in root dir, so search module tree. 451 for(Module module : _moduleTree) { 452 String fsPath = new StringBuilder(module.getResourcesDir().getAbsolutePath()) 453 .append(File.separator) 454 .append("html") 455 .append(File.separator) 456 .append(name).toString(); 457 // TODO check path is not outside of resources/html dir 458 //System.out.println("checking " + fsPath); 459 if(_vertx.fileSystem().existsBlocking(fsPath)) { 460 return fsPath; 461 } 462 } 463 464 // not found anywhere 465 return null; 466 } 467 468 /** Get the client buffers for a workflow. */ 469 public static List<JsonObject> getClientBuffer(NamedObj model) { 470 return _clientBuffers.get(model); 471 } 472 473 /** Called during initialization. */ 474 public static void initialize() { 475 476 int workerPoolSize = WebViewConfiguration.getHttpServerWorkerThreads(); 477 VertxOptions options = new VertxOptions() 478 .setWorkerPoolSize(workerPoolSize) 479 .setMaxWorkerExecuteTime(Long.MAX_VALUE); 480 481 if(WebViewConfiguration.shouldStartClusterHazelcast()) { 482 Config hazelcastConfig = new Config(); 483 ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig); 484 if (WebViewConfiguration.deployInKubernetes()) { 485 hazelcastConfig.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); 486 hazelcastConfig.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true) 487 .setProperty("service-dns", WebViewConfiguration.getHazelcastDiscoveryDnsServiceName()); 488 } 489 options.setClusterManager(mgr); 490 Vertx.clusteredVertx(options, result -> { 491 if(result.failed()) { 492 MessageHandler.error("Failed to get vertx context.", result.cause()); 493 // TODO shutdown? 494 } else { 495 _vertx = result.result(); 496 _initializeAfterCreatingVertx(); 497 } 498 }); 499 } else if(!WebViewConfiguration.shouldStartCluster()) { 500 _vertx = Vertx.vertx(options); 501 _initializeAfterCreatingVertx(); 502 } else { 503 MessageHandler.error("Unknown value for cluster type in configuration.xml."); 504 Shutdown.shutdownLatch.countDown(); 505 } 506 507 } 508 509 private static void _initializeAfterCreatingVertx() { 510 511 512 String rootDir = WebViewConfiguration.getHttpServerRootDir(); 513 if(rootDir != null && 514 !rootDir.equals(WebViewConfiguration.DEFAULT_WEBVIEW_SERVER_ROOT_DIR)) { 515 _rootDir = rootDir; 516 System.out.println("Web view root dir is " + _rootDir); 517 } 518 519 _appendIndexHtml = WebViewConfiguration.getHttpServerAppendIndexHtml(); 520 521 _workflowTimeout = WebViewConfiguration.getHttpServerWorkflowTimeout(); 522 523 // register as an updater to get window open and close events 524 KeplerGraphFrame.addUpdater(_updater); 525 526 // start logging thread 527 _loggingThread = new Thread(new Runnable() { 528 @Override 529 public void run() { 530 String logPath = WebViewConfiguration.getHttpServerLogPath(); 531 System.out.println("Web-view logging to " + logPath); 532 try(FileWriter writer = new FileWriter(logPath, true)) { 533 String message; 534 try { 535 while((message = _loggingQueue.take()) != STOP_LOGGING_THREAD_MESSAGE) { 536 writer.write(message); 537 writer.write('\n'); 538 writer.flush(); 539 } 540 } catch (InterruptedException e) { 541 MessageHandler.error("Interrupted while waiting in logging thread.", e); 542 } 543 } catch (IOException e) { 544 MessageHandler.error("Error trying to write to web-view server log " + 545 logPath, e); 546 } 547 } 548 }); 549 _loggingThread.start(); 550 551 552 boolean daemon = WebViewConfiguration.shouldStartHttpServerAsDaemon(); 553 554 if(daemon) { 555 System.out.println("Loading MoML filters."); 556 557 // We set the list of MoMLFilters to handle Backward Compatibility. 558 MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters()); 559 560 // load the gui and cache configuration since we need the CacheManager 561 // to load the KAREntryHandlers for exporting provenance kars. 562 try { 563 ConfigurationApplication.readConfiguration( 564 ConfigurationApplication.specToURL("ptolemy/configs/kepler/ConfigGUIAndCache.xml")); 565 } catch (Exception e) { 566 MessageHandler.error("Error creating Configuration.", e); 567 } 568 } 569 570 if(WebViewConfiguration.shouldStartHttpServer() || daemon) { 571 572 /* TODO still necessary? 573 List<URL> list = new LinkedList<URL>(); 574 for(String path : System.getProperty("java.class.path").split(File.pathSeparator)) { 575 try { 576 list.add(new File(path).toURI().toURL()); 577 } catch (MalformedURLException e) { 578 MessageHandler.error("Bad URL.", e); 579 } 580 } 581 */ 582 // list.toArray(new URL[list.size()]), 583 584 DeploymentOptions deploymentOptions = new DeploymentOptions() 585 .setInstances(WebViewConfiguration.getHttpServerInstances()); 586 587 WebViewServer.vertx().deployVerticle( 588 WebViewServer.class.getName(), 589 deploymentOptions, 590 deployResult -> { 591 if(deployResult.failed()) { 592 MessageHandler.error("Failed to deploy web view server.", deployResult.cause()); 593 // TODO shut down kepler. 594 } 595 } 596 ); 597 598 for(String model: WebViewConfiguration.getPreloadModels()) { 599 System.out.println("Preloading " + model); 600 try { 601 if(_getModel(model) == null) { 602 System.err.println("ERROR: Unable to find model for preload: " + model); 603 } 604 } catch (Exception e) { 605 System.err.println("ERROR: Unable to preload " + model + ": " + e.getMessage()); 606 } 607 } 608 609 // TODO preload apps 610 611 } else { 612 // not starting server, so set latch to 0 613 Shutdown.shutdownLatch.countDown(); 614 } 615 616 } 617 618 /** Log an http server request. 619 * @param request The request 620 * @param user The user 621 * @param status The status 622 * @param timestamp The timestamp when the request occurred. 623 */ 624 public void log(HttpServerRequest request, User user, int status, long timestamp) { 625 log(request, user, status, timestamp, -1); 626 } 627 628 /** Log an http server request. 629 * @param request The request 630 * @param user The user 631 * @param status The status 632 * @param timestamp The timestamp when the request occurred. 633 * @param length The length of the file successfully sent to the client. 634 */ 635 public void log(HttpServerRequest request, User user, int status, long timestamp, long length) { 636 637 String versionStr; 638 switch(request.version()) { 639 case HTTP_1_1: 640 versionStr = "HTTP/1.1"; 641 break; 642 case HTTP_1_0: 643 versionStr = "HTTP/1.0"; 644 break; 645 default: 646 versionStr = "-"; 647 break; 648 } 649 650 String hostString; 651 SocketAddress address = request.remoteAddress(); 652 if(address == null) { 653 hostString = "-"; 654 } else { 655 hostString = address.host(); 656 } 657 658 String referrer = request.headers().get("referrer"); 659 if(referrer == null) { 660 referrer = "-"; 661 } 662 663 String userAgent = request.headers().get("user-agent"); 664 if(userAgent == null) { 665 userAgent = "-"; 666 } 667 668 String userNameStr = "-"; 669 if(user != null) { 670 //System.out.println(user); 671 userNameStr = user.principal().getString("username"); 672 } 673 674 // TODO how to get user-identifier and userid? 675 String message = String.format("%s - %s [%s] \"%s %s %s\" %d %s \"%s\" \"%s\"", 676 hostString, 677 userNameStr, 678 _logTimeFormat.format(new Date(timestamp)), 679 request.method(), 680 request.uri(), 681 versionStr, 682 status, 683 length < 0 ? "-" : String.valueOf(length), 684 referrer, 685 userAgent); 686 687 try { 688 _loggingQueue.put(message); 689 } catch (InterruptedException e) { 690 MessageHandler.error("Interuppted adding to logging thread.", e); 691 } 692 } 693 694 public void log(ServerWebSocket socket, String command, String path) { 695 log(socket, command, path, null); 696 } 697 698 public void log(ServerWebSocket socket, String command, String path, Buffer buffer) { 699 700 final long timestamp = System.currentTimeMillis(); 701 702 String hostString; 703 SocketAddress address = socket.remoteAddress(); 704 if(address == null) { 705 hostString = "-"; 706 } else { 707 hostString = address.host(); 708 } 709 710 String referrer = socket.headers().get("referrer"); 711 if(referrer == null) { 712 referrer = "-"; 713 } 714 715 String userAgent = socket.headers().get("user-agent"); 716 if(userAgent == null) { 717 userAgent = "-"; 718 } 719 720 String message = String.format("%s - - [%s] \"%s %s %s %s\" %s %s \"%s\" \"%s\"", 721 hostString, 722 _logTimeFormat.format(new Date(timestamp)), 723 command, 724 path, 725 buffer == null ? "-" : buffer.toString(), 726 "websocket", 727 "-", // status 728 "-", // length 729 referrer, 730 userAgent); 731 732 try { 733 _loggingQueue.put(message); 734 } catch (InterruptedException e) { 735 MessageHandler.error("Interuppted adding to logging thread.", e); 736 } 737 } 738 739 /** Start the server. */ 740 @Override 741 public void start() throws Exception { 742 super.start(); 743 744 _auth = WebViewConfiguration.getAuthProvider(); 745 746 _queryable = ProvenanceRecorder.getDefaultQueryable(null); 747 if(_queryable == null) { 748 throw new Exception("Unable to get provenance queryable."); 749 } 750 751 Router router = Router.router(vertx); 752 753 if(WebViewConfiguration.getHttpServerCorsEnabled()) { 754 String allowOriginPatternStr = WebViewConfiguration.getHttpServerCorsAllowOriginPattern(); 755 if(allowOriginPatternStr == null || allowOriginPatternStr.trim().isEmpty()) { 756 throw new Exception("Must specify allow origini pattern for CORS."); 757 } 758 router.route().handler(CorsHandler.create(allowOriginPatternStr) 759 .allowedMethod(HttpMethod.GET) 760 .allowedMethod(HttpMethod.POST) 761 .allowedMethod(HttpMethod.OPTIONS) 762 .allowCredentials(true) 763 .allowedHeader("Access-Control-Allow-Credentials") 764 .allowedHeader("X-PINGARUNER") 765 .allowedHeader("Content-Type") 766 .allowedHeader("authorization")); 767 System.out.println("CORS enabled for " + allowOriginPatternStr); 768 } 769 770 771 // create cookie and session handlers 772 // authenticated users are cached in the session so authentication 773 // does not need to be performed for each request. 774 router.route().handler(CookieHandler.create()); 775 776 BodyHandler bodyHandler = BodyHandler.create(); 777 778 if(_rootDir != null) { 779 File uploadsDir = new File(_rootDir, "file-uploads"); 780 if(!uploadsDir.exists()) { 781 if(!uploadsDir.mkdirs()) { 782 throw new Exception("Unable to create file uploads directory " + uploadsDir); 783 } 784 } 785 bodyHandler.setUploadsDirectory(uploadsDir.getAbsolutePath()); 786 } 787 788 // NOTE: need to install the BodyHandler before any handlers that 789 // make asynchronous calls such as BasicAuthHandler. 790 router.route().handler(bodyHandler); 791 792 SessionStore sessionStore; 793 if(WebViewConfiguration.shouldStartCluster()) { 794 sessionStore = ClusteredSessionStore.create(_vertx); 795 } else { 796 sessionStore = LocalSessionStore.create(_vertx); 797 } 798 SessionHandler sessionHandler = SessionHandler.create(sessionStore) 799 .setSessionCookieName("WebViewSession") 800 .setSessionTimeout(WebViewConfiguration.getHttpServerSessionTimeout()); 801 router.route().handler(sessionHandler); 802 router.route().handler(UserSessionHandler.create(_auth)); 803 804 //System.out.println("session timeout = " + WebViewConfiguration.getHttpServerSessionTimeout()); 805 806 // use custom auth handler with custom auth scheme to prevent 807 // browsers (e.g., chrome) from opening auth dialog. 808 // FIXME read realm from conf file. 809 AuthHandler authenticationHandler = new WebViewAuthHandlerImpl(_auth, "WebView"); 810 811 Handler<RoutingContext> authorizationHandler = new Handler<RoutingContext>() { 812 @Override 813 public void handle(RoutingContext c) { 814 if(c.user() == null) { 815 c.next(); 816 } else { 817 c.user().isAuthorized("login", result -> { 818 if(result.failed()) { 819 c.response() 820 .putHeader("Content-type", "text/plain") 821 .setStatusCode(HttpURLConnection.HTTP_FORBIDDEN) 822 .end(result.cause().getMessage()); 823 } else { 824 c.next(); 825 } 826 }); 827 } 828 } 829 }; 830 831 // everything under /kepler and /app needs to be 832 // authenticated and authorized. 833 router.route("/kepler/*") 834 .handler(authenticationHandler) 835 .handler(authorizationHandler); 836 router.route("/app") 837 .handler(authenticationHandler) 838 .handler(authorizationHandler); 839 840 // TODO should these be under /kepler? 841 router.getWithRegex("^/wf/(\\d+)$").handler(new WorkflowHandler(this)); 842 router.getWithRegex("^/wf/(\\d+(?:\\-\\d+)+)$").handler(new ActorHandler(this)); 843 844 RunWorkflowHandler runWorkflowHandler = new RunWorkflowHandler(this); 845 router.post("/kepler/runwf").handler(runWorkflowHandler::handleRunWorkflow); 846 router.post("/app").handler(runWorkflowHandler::handleRunApp); 847 848 router.post("/kepler/runs").handler(new RunsHandler(this)); 849 850 RunIdHandler runIdHandler = new RunIdHandler(this); 851 852 String runIdRegexStr = "(urn:lsid:[^:]+:[^:]+:[^:]+(?::\\d+)?)"; 853 router.getWithRegex("/kepler/runs/" + runIdRegexStr + "/(\\w+)").blockingHandler(runIdHandler::handleBinary); 854 router.getWithRegex("/kepler/runs/" + runIdRegexStr).handler(runIdHandler); 855 856 if(WebViewConfiguration.getHttpServerMetadataFileName() != null) { 857 System.out.println("Metadata file set; requests at /login"); 858 859 router.route("/login") 860 .handler(authenticationHandler) 861 .handler(authorizationHandler); 862 863 LoginHandler loginHandler = new LoginHandler(this); 864 router.route("/login").handler(loginHandler); 865 866 // login session handler to check if session cookie is valid. 867 /* 868 router.route("/loginSession") 869 .handler(authorizationHandler) 870 .handler(loginSessionContext -> { 871 872 System.out.println("session " + loginSessionContext.session().id()); 873 874 // if not valid, return 400 875 if(loginSessionContext.user() == null) { 876 System.out.println("user is null"); 877 loginSessionContext.response() 878 .putHeader("Content-Type", "text/plain") 879 .setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST) 880 .end(); 881 } else { 882 // otherwise call login handler to return metadata. 883 loginSessionContext.next(); 884 } 885 }); 886 router.route("/loginSession").handler(loginHandler); 887 */ 888 889 // logout handler to remove session and user. 890 router.route("/logout").handler(authenticationHandler); 891 // FIXME what about authorization handler? 892 router.route("/logout").handler(logoutContext -> { 893 //System.out.println("destroying session " + logoutContext.session().id()); 894 logoutContext.session().destroy(); 895 logoutContext.clearUser(); 896 logoutContext.response() 897 .putHeader("Content-Type", "text/plain") 898 .end(); 899 }); 900 } 901 902 if(WebViewConfiguration.enableHttpServerTableOfContents()) { 903 String path = WebViewConfiguration.getHttpServerTableOfContentsPath(); 904 if(path == null) { 905 path = "^(?:/kepler/toc){0,1}/*$"; 906 } 907 System.out.println("Enabling http server table of contents at " + path); 908 router.getWithRegex(path).handler(new TableOfContentsHandler(this)); 909 } 910 911 // add route that handles anything unmatched 912 router.route().handler(new NoMatchHandler(this)); 913 914 if(WebViewConfiguration.enableHttps()) { 915 916 sessionHandler.setCookieSecureFlag(true); 917 918 String pemKeyStr = WebViewConfiguration.getHttpsPEMKeyPath(); 919 if(pemKeyStr == null) { 920 throw new Exception("Must specify PEM key file for HTTPS server."); 921 } 922 923 String pemCertStr = WebViewConfiguration.getHttpsPEMCertPath(); 924 if(pemCertStr == null) { 925 throw new Exception("Must specify PEM certificate file for HTTPS server."); 926 } 927 928 int securePort = WebViewConfiguration.getHttpsServerPort(); 929 930 // FIXME what if pemCert or pemKey do not exist? 931 932 HttpServerOptions options = new HttpServerOptions() 933 .setSsl(true) 934 .setPemKeyCertOptions(new PemKeyCertOptions() 935 .setCertPath(pemCertStr) 936 .setKeyPath(pemKeyStr)) 937 .setPort(securePort); 938 939 System.out.println("Starting secure web server on port " + securePort); 940 941 _secureServer = vertx.createHttpServer(options) 942 .websocketHandler(new WorkflowWebSocketHandler(this)) 943 .requestHandler(router::accept) 944 .listen(); 945 946 if(WebViewConfiguration.shouldHttpServerRedirect()) { 947 948 int status = WebViewConfiguration.getHttpServerRedirectStatus(); 949 String hostname = WebViewConfiguration.getHttpServerRedirectHostname(); 950 int redirectFromPort = WebViewConfiguration.getHttpServerPort(); 951 int redirectToPort = WebViewConfiguration.getHttpServerRedirectPort(); 952 953 String redirectUrl = "https://" + hostname + ":" + redirectToPort; 954 955 System.out.println("Redirecting web view server port " + redirectFromPort + 956 " to " + redirectUrl + " with status " + status); 957 958 _server = vertx.createHttpServer() 959 .requestHandler(req -> { 960 req.response().headers().set("Location", redirectUrl + req.path()); 961 req.response() 962 .setStatusCode(status) 963 .end(); 964 }) 965 .listen(redirectFromPort); 966 } 967 968 } else { 969 970 int port = WebViewConfiguration.getHttpServerPort(); 971 972 System.out.println("Starting web view server on port " + port); 973 974 _server = vertx.createHttpServer() 975 .websocketHandler(new WorkflowWebSocketHandler(this)) 976 .requestHandler(router::accept) 977 .listen(port); 978 } 979 } 980 981 public static void removeModel(NamedObj model) { 982 983 // notify any clients that the workflow was closed. 984 try { 985 WebViewableUtilities.sendEvent( 986 WebViewableUtilities.Event.WorkflowClosed, model); 987 } catch (IllegalActionException e) { 988 MessageHandler.error("Error notifying clients that workflow closed.", e); 989 } 990 991 WebViewId.removeWorkflow(model); 992 993 //System.gc(); 994 _clientBuffers.remove(model); 995 996 //System.out.println("removing " + model + " " + model.hashCode()); 997 /* 998 for(CompositeActor master : _models.values()) { 999 System.out.println("master " + master + " " + master.hashCode()); 1000 } 1001 */ 1002 1003 //System.out.println("remove " + model.hashCode()); 1004 1005 //_models.remove(model); 1006 1007 //System.out.println("removed workflow " + model.getName()); 1008 } 1009 1010 /** Called during shutdown. */ 1011 public static void shutdown() { 1012 1013 KeplerGraphFrame.removeUpdater(_updater); 1014 1015 for(CompositeActor model : _models.values()) { 1016 removeModel(model); 1017 } 1018 1019 _models.clear(); 1020 1021 for(App app : _apps.values()) { 1022 app.close(); 1023 } 1024 1025 _apps.clear(); 1026 1027 // stop the logging thread 1028 _loggingQueue.add(STOP_LOGGING_THREAD_MESSAGE); 1029 try { 1030 _loggingThread.join(5000); 1031 } catch (InterruptedException e) { 1032 MessageHandler.error("Interrupted while waiting for logging thread to stop.", e); 1033 } 1034 _loggingThread = null; 1035 } 1036 1037 /** Stop the server. */ 1038 @Override 1039 public void stop() throws Exception { 1040 super.stop(); 1041 1042 if(_server != null) { 1043 //System.out.println("Stopping vertx web server."); 1044 _server.close(); 1045 _server = null; 1046 } 1047 1048 1049 if(_secureServer != null) { 1050 _secureServer.close(); 1051 _secureServer = null; 1052 } 1053 } 1054 1055 public static Vertx vertx() { 1056 return _vertx; 1057 } 1058 1059 private static class ModelUpdater implements KeplerGraphFrameUpdater { 1060 1061 @Override 1062 public int compareTo(KeplerGraphFrameUpdater o) { 1063 return 1; 1064 } 1065 1066 @Override 1067 public void updateFrameComponents(Components components) { 1068 try { 1069 CompositeActor model = (CompositeActor) components.getFrame().getModel(); 1070 _addModel(model); 1071 _models.put(model.getName(), model); 1072 } catch (Exception e) { 1073 MessageHandler.error("Error adding model from UI.", e); 1074 } 1075 1076 } 1077 1078 /** Remove the model associated with the frame from the web server. */ 1079 @Override 1080 public void dispose(KeplerGraphFrame frame) { 1081 CompositeActor model = (CompositeActor) frame.getModel(); 1082 removeModel(model); 1083 _models.remove(model.getName()); 1084 } 1085 1086 } 1087 1088 /////////////////////////////////////////////////////////////////// 1089 //// public variables //// 1090 1091 public final static String WS_PATH = "/ws/"; 1092 public final static String WS_RUNWF_PATH = "/ws-runwf"; 1093 1094 /////////////////////////////////////////////////////////////////// 1095 //// package-protected variables //// 1096 1097 1098 /////////////////////////////////////////////////////////////////// 1099 //// private methods //// 1100 1101 /** Add a model to the set managed by the server. */ 1102 public static void _addModel(CompositeActor model) throws Exception { 1103 1104 //System.out.println("adding model " + model.getFullName()); 1105 1106 //_models.add(model); 1107 1108 //System.out.println("put " + model.hashCode()); 1109 _clientBuffers.put(model, new LinkedList<JsonObject>()); 1110 1111 /* 1112 ProvenanceRecorder recorder = ProvenanceRecorder.getDefaultProvenanceRecorder(model); 1113 if(recorder != null) { 1114 recorder.addPiggyback(new WebViewRecording()); 1115 } 1116 */ 1117 1118 if(model.attributeList(ControlAttribute.class).isEmpty()) { 1119 ControlAttribute control = new ControlAttribute(model, "_wvControl"); 1120 control.setPersistent(false); 1121 } 1122 1123 if(model.attributeList(ParametersAttribute.class).isEmpty()) { 1124 ParametersAttribute parameters = new ParametersAttribute(model, "_wvParameters"); 1125 parameters.setPersistent(false); 1126 } 1127 1128 /* 1129 WebViewAttribute timeline = (WebViewAttribute) model.getAttribute("_wfTimeline"); 1130 if(timeline == null) { 1131 timeline = new WebViewAttribute(model, "_wfTimeline"); 1132 timeline.htmlFile.setExpression("web-view/visjs/wf-timeline.html"); 1133 timeline.title.setExpression("Workflow Execution Timeline"); 1134 } 1135 timeline.setPersistent(false); 1136 */ 1137 } 1138 1139 /** Get a clone of a model. */ 1140 private static CompositeActor _cloneModel(CompositeActor masterModel) throws CloneNotSupportedException { 1141 CompositeActor model = (CompositeActor)masterModel.clone(new Workspace()); 1142 _clientBuffers.put(model, new LinkedList<JsonObject>()); 1143 //System.out.println("put " + model.hashCode()); 1144 return model; 1145 } 1146 1147 /** Convert from a JSON object value to a string representation of a Ptolemy token. */ 1148 private static String _convertJSONToTokenString(Object value) { 1149 if(value instanceof JsonArray) { 1150 StringBuilder buf = new StringBuilder("{"); 1151 Iterator<Object> iter = ((JsonArray)value).iterator(); 1152 while(iter.hasNext()) { 1153 Object item = iter.next(); 1154 // strings need to be surrounded by quotes in arrays 1155 if(item instanceof String) { 1156 buf.append("\"") 1157 .append(item) 1158 .append("\""); 1159 } else { 1160 // item may be a complex type (e.g., record token) 1161 buf.append(_convertJSONToTokenString(item)); 1162 } 1163 if(iter.hasNext()) { 1164 buf.append(","); 1165 } 1166 } 1167 return buf.append("}").toString(); 1168 } else if(value instanceof JsonObject) { 1169 // convert json objects to ptolemy record strings. 1170 StringBuilder buf = new StringBuilder("{"); 1171 Iterator<Entry<String, Object>> iter = ((JsonObject)value).iterator(); 1172 while(iter.hasNext()) { 1173 Entry<String,Object> entry = iter.next(); 1174 buf.append(entry.getKey()) 1175 .append("=") 1176 .append(_convertJSONToTokenString(entry.getValue())); 1177 if(iter.hasNext()) { 1178 buf.append(","); 1179 } 1180 }; 1181 return buf.append("}").toString(); 1182 } else { 1183 // FIXME if value is a string and parameter is not in string mode, 1184 // will this work? 1185 return value.toString(); 1186 } 1187 } 1188 1189 @SuppressWarnings("resource") 1190 private App _getApp(String name) throws Exception { 1191 1192 App masterApp = null; 1193 synchronized(_apps) { 1194 // see if we've already instantiated this app 1195 masterApp = _apps.get(name); 1196 if(masterApp == null) { 1197 1198 String className = WebViewConfiguration.getAppClassName(name); 1199 1200 if(className == null || className.trim().isEmpty()) { 1201 throw new Exception("Missing class name for app " + name); 1202 } 1203 1204 try { 1205 Class<?> appClass = Class.forName(className); 1206 masterApp = (App) appClass.newInstance(); 1207 _apps.put(name, masterApp); 1208 } catch(ClassNotFoundException | InstantiationException 1209 | IllegalAccessException e) { 1210 throw new Exception("Error instantiating app " + name + ": " + e.getMessage()); 1211 } 1212 } 1213 } 1214 return (App) masterApp.clone(); 1215 } 1216 1217 /** Get the model for a specific name. If the model is not already 1218 * loaded, then use findFile() to find the model file with the 1219 * given name, name.kar and name.xml. Otherwise, return a copy 1220 * of the already-loaded model. 1221 * @return If a model with the specified name is loaded or can be 1222 * found and parsed, returns the model. Otherwise, null. 1223 */ 1224 private static CompositeActor _getModel(String name) throws Exception { 1225 1226 //System.out.println("size = " + _models.size()); 1227 1228 synchronized(_models) { 1229 1230 // see if the model is already loaded. 1231 CompositeActor masterModel = _models.get(name); 1232 if(masterModel != null) { 1233 //System.out.println("found master model."); 1234 return _cloneModel(masterModel); 1235 } 1236 1237 // if not, find workflow file and load 1238 String wfFileStr = findFile(name); 1239 if(wfFileStr == null) { 1240 wfFileStr = findFile(name + ".kar"); 1241 if(wfFileStr == null) { 1242 wfFileStr = findFile(name + ".xml"); 1243 if(wfFileStr == null) { 1244 return null; 1245 } 1246 } 1247 } 1248 1249 masterModel = (CompositeActor) ParseWorkflow.parseKAR(new File(wfFileStr), true); 1250 _addModel(masterModel); 1251 _models.put(name, masterModel); 1252 //System.out.println("adding master model " + name); 1253 1254 // see if there is a configuration file for parameters 1255 File wfFile = new File(wfFileStr); 1256 File wfDir = wfFile.getParentFile(); 1257 File wfConfFile = new File(wfDir, 1258 FilenameUtils.getBaseName(wfFileStr) + ".conf"); 1259 if(wfConfFile.exists()) { 1260 ParseWorkflowUtil.setParametersFromFile(masterModel, wfConfFile.getAbsolutePath()); 1261 } 1262 1263 return _cloneModel(masterModel); 1264 } 1265 } 1266 1267 /** Set any parameters for a model from a JSON object. */ 1268 private static void _setModelParameters(NamedObj model, JsonObject json, 1269 User user) throws IllegalActionException { 1270 1271 // get any parameters 1272 JsonObject params = null; 1273 if(json.containsKey("wf_param")) { 1274 try { 1275 params = json.getJsonObject("wf_param"); 1276 } catch(ClassCastException e) { 1277 throw new IllegalActionException("wf_param must be a json object."); 1278 } 1279 1280 for(Map.Entry<String, Object> entry: params) { 1281 final String paramName = entry.getKey(); 1282 //System.out.println("attempting to set parameter " + paramName); 1283 Attribute attribute = model.getAttribute(paramName); 1284 if(attribute == null || !(attribute instanceof Settable)) { 1285 throw new IllegalActionException("Workflow does not have settable parameter " + paramName); 1286 } 1287 1288 try { 1289 String valueStr = _convertJSONToTokenString(entry.getValue()); 1290 ((Settable)attribute).setExpression(valueStr); 1291 } catch (IllegalActionException e) { 1292 throw new IllegalActionException("Error settings parameter " + 1293 paramName + ": " + e.getMessage()); 1294 } 1295 } 1296 } 1297 1298 1299 // set user full name if present 1300 Attribute attribute = model.getAttribute("WebView_FullName"); 1301 if(attribute != null) { 1302 if(!(attribute instanceof Settable)) { 1303 System.err.println("WARNING: WebView_FullName parameter is not settable."); 1304 } else { 1305 ((Settable)attribute).setExpression(user.principal().getString("fullname")); 1306 } 1307 attribute = null; 1308 } 1309 1310 // set the authorization groups parameter if present 1311 attribute = model.getAttribute("WebView_groups"); 1312 if(attribute != null) { 1313 if(!(attribute instanceof Settable)) { 1314 System.err.println("WARNING: WebView_group parameter is not settable."); 1315 } else { 1316 // add quotes around each string since we use this to set the 1317 // WebView_groups parameter in the workflow. 1318 String authGroupsStr = AuthUtilities.getGroups(user).stream() 1319 .map(s -> "\"" + s + "\"").collect(Collectors.joining(",")); 1320 //System.out.println(authGroupsStr); 1321 ((Settable)attribute).setExpression("{" + authGroupsStr + "}"); 1322 } 1323 } 1324 1325 } 1326 1327 /////////////////////////////////////////////////////////////////// 1328 //// private variables //// 1329 1330 private Queryable _queryable; 1331 1332 /** Authentication provider. */ 1333 private AuthProvider _auth; 1334 1335 /** HTTP server listening for HTTP and WS requests. */ 1336 private HttpServer _server; 1337 1338 /** HTTPS server listening for HTTPS and WSS requests. */ 1339 private HttpServer _secureServer; 1340 1341 /** ModelUpdater used to add/remove workflows when KeplerGraphFrames 1342 * are opened/closed. 1343 */ 1344 private final static ModelUpdater _updater = new ModelUpdater(); 1345 1346 /** The module tree. */ 1347 private final static ModuleTree _moduleTree = ModuleTree.instance(); 1348 1349 /** The root directory to search for files. If not set, the value is null. */ 1350 private static String _rootDir; 1351 1352 // TODO need better name 1353 private final static Map<NamedObj,List<JsonObject>> _clientBuffers = 1354 Collections.synchronizedMap(new HashMap<NamedObj,List<JsonObject>>()); 1355 1356 public final static Map<String,CompositeActor> _models 1357 = Collections.synchronizedMap(new HashMap<String,CompositeActor>()); 1358 1359 public final static Map<String,App> _apps 1360 = Collections.synchronizedMap(new HashMap<String,App>()); 1361 1362 /* Timestamp format for logging. */ 1363 private final DateFormat _logTimeFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z"); 1364 1365 private static Thread _loggingThread; 1366 1367 private final static String STOP_LOGGING_THREAD_MESSAGE = "STOP"; 1368 1369 private final static LinkedBlockingQueue<String> _loggingQueue = new LinkedBlockingQueue<String>(); 1370 1371 /** If true, append index.html to all (unmatched) GETs ending with "/". */ 1372 private static boolean _appendIndexHtml; 1373 1374 private static Vertx _vertx; 1375 1376 private static long _workflowTimeout; 1377}