001/*
002 * Copyright (c) 2015 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2017-09-04 12:59:35 -0700 (Mon, 04 Sep 2017) $' 
007 * '$Revision: 1407 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.webview.server;
030
031import java.io.File;
032import java.io.FileWriter;
033import java.io.IOException;
034import java.net.HttpURLConnection;
035import java.text.DateFormat;
036import java.text.SimpleDateFormat;
037import java.util.Collections;
038import java.util.Date;
039import java.util.HashMap;
040import java.util.Iterator;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.concurrent.LinkedBlockingQueue;
046import java.util.stream.Collectors;
047
048import org.apache.commons.io.FilenameUtils;
049import org.kepler.build.modules.Module;
050import org.kepler.build.modules.ModuleTree;
051import org.kepler.gui.KeplerGraphFrame;
052import org.kepler.gui.KeplerGraphFrame.Components;
053import org.kepler.gui.KeplerGraphFrameUpdater;
054import org.kepler.loader.util.ParseWorkflow;
055import org.kepler.module.webview.Shutdown;
056import org.kepler.objectmanager.lsid.KeplerLSID;
057import org.kepler.provenance.ProvenanceRecorder;
058import org.kepler.provenance.Queryable;
059import org.kepler.provenance.Recording;
060import org.kepler.util.ParseWorkflowUtil;
061import org.kepler.webview.actor.ControlAttribute;
062import org.kepler.webview.actor.ParametersAttribute;
063import org.kepler.webview.server.app.App;
064import org.kepler.webview.server.auth.AuthUtilities;
065import org.kepler.webview.server.auth.WebViewAuthHandlerImpl;
066import org.kepler.webview.server.handler.ActorHandler;
067import org.kepler.webview.server.handler.LoginHandler;
068import org.kepler.webview.server.handler.NoMatchHandler;
069import org.kepler.webview.server.handler.RunIdHandler;
070import org.kepler.webview.server.handler.RunWorkflowHandler;
071import org.kepler.webview.server.handler.RunsHandler;
072import org.kepler.webview.server.handler.TableOfContentsHandler;
073import org.kepler.webview.server.handler.WorkflowHandler;
074import org.kepler.webview.server.handler.WorkflowWebSocketHandler;
075
076import com.hazelcast.config.Config;
077
078import io.vertx.core.AbstractVerticle;
079import io.vertx.core.AsyncResult;
080import io.vertx.core.DeploymentOptions;
081import io.vertx.core.Future;
082import io.vertx.core.Handler;
083import io.vertx.core.Vertx;
084import io.vertx.core.VertxOptions;
085import io.vertx.core.buffer.Buffer;
086import io.vertx.core.http.HttpMethod;
087import io.vertx.core.http.HttpServer;
088import io.vertx.core.http.HttpServerOptions;
089import io.vertx.core.http.HttpServerRequest;
090import io.vertx.core.http.ServerWebSocket;
091import io.vertx.core.json.JsonArray;
092import io.vertx.core.json.JsonObject;
093import io.vertx.core.net.PemKeyCertOptions;
094import io.vertx.core.net.SocketAddress;
095import io.vertx.core.spi.cluster.ClusterManager;
096import io.vertx.ext.auth.AuthProvider;
097import io.vertx.ext.auth.User;
098import io.vertx.ext.web.Router;
099import io.vertx.ext.web.RoutingContext;
100import io.vertx.ext.web.handler.AuthHandler;
101import io.vertx.ext.web.handler.BodyHandler;
102import io.vertx.ext.web.handler.CookieHandler;
103import io.vertx.ext.web.handler.CorsHandler;
104import io.vertx.ext.web.handler.SessionHandler;
105import io.vertx.ext.web.handler.UserSessionHandler;
106import io.vertx.ext.web.sstore.ClusteredSessionStore;
107import io.vertx.ext.web.sstore.LocalSessionStore;
108import io.vertx.ext.web.sstore.SessionStore;
109import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
110import ptolemy.actor.CompositeActor;
111import ptolemy.actor.ExecutionListener;
112import ptolemy.actor.Manager;
113import ptolemy.actor.gui.ConfigurationApplication;
114import ptolemy.kernel.util.Attribute;
115import ptolemy.kernel.util.IllegalActionException;
116import ptolemy.kernel.util.NamedObj;
117import ptolemy.kernel.util.Settable;
118import ptolemy.kernel.util.Workspace;
119import ptolemy.moml.MoMLParser;
120import ptolemy.moml.filter.BackwardCompatibility;
121import ptolemy.util.MessageHandler;
122
123public class WebViewServer extends AbstractVerticle {
124    
125    /** Execute an app.
126     *  @param json The JSON object with the workflow name and any parameters.
127     *  @param handler Asynchronous handler to receive the a JSON object with any results.
128     */
129    public void executeApp(JsonObject json, User user,
130        Handler<AsyncResult<JsonObject>> handler) {
131
132        String appName = json.getString("app_name");
133        //System.out.println("execute workflow " + wfName);
134        
135        if(appName == null || appName.trim().isEmpty()) {
136            handler.handle(Future.failedFuture("No app_name specified."));
137            return;
138        }
139
140        if(user == null) {
141            handler.handle(Future.failedFuture("User is not authenticated"));
142            return;
143        }
144        
145        for(String key: json.fieldNames()) {
146            if(!key.equals("app_name") &&
147                !key.equals("app_param") &&
148                !key.equals("prov") && 
149                !key.equals("reqid") &&
150                !key.equals("sync")) {
151                System.err.println("WARNING: unknown property in request: " + key);
152            }
153        }
154        
155        // the following will block, e.g., waiting on the workspace lock,
156        // so execute in blocking threads.
157        _vertx.<JsonObject>executeBlocking(future -> {
158
159            //System.out.println("executeBlocking " + wfName);
160            
161            boolean runSynchronously = false;
162            App app = null;
163            boolean running = false;
164            
165            // see if app is already loaded
166            try {
167                app = _getApp(appName);
168                
169                if(app == null) {
170                    throw new Exception("App not found.");
171                }
172                     
173                JsonObject appParams;
174                if(json.containsKey("app_param")) {
175                    try {
176                        appParams = json.getJsonObject("app_param");
177                    } catch(ClassCastException e) {
178                        throw new Exception("app_param must be a json object.");
179                    }                
180                } else {
181                    appParams = new JsonObject();
182                }
183                                        
184                runSynchronously = json.getBoolean("sync", false);
185                if(!runSynchronously) {
186                    throw new Exception("Asynchronous execution not supported for apps.");
187                }
188                
189                if(runSynchronously) {                                                                                                                
190                    // execute the application
191                    running = true;
192                    final App appCopy = app;
193                    app.exec(user, appParams, appHandler -> {
194
195                        if(appHandler.failed()) {
196                            future.fail(appHandler.cause());
197                        } else {
198                            future.complete(new JsonObject().put("responses", appHandler.result()));                            
199                        }
200                        appCopy.close();
201                    });
202                }
203            } catch (Exception e) {
204                e.printStackTrace();
205                future.fail("Error executing app: " + e.getMessage());
206                return;
207            } finally {
208                if(!running) {
209                    app.close();                   
210                }
211            }
212                        
213        // set ordered to false to allow parallel executions,
214        // and use handler for result.    
215        }, false, handler);
216    }
217    
218    /** Execute a workflow.
219     *  @param json The JSON object with the workflow name and any parameters.
220     *  @param handler Asynchronous handler to receive the a JSON object with any results.
221     */
222    public void executeWorkflow(JsonObject json, User user,
223        Handler<AsyncResult<JsonObject>> handler) {
224
225        String wfName = json.getString("wf_name");
226        //System.out.println("execute workflow " + wfName);
227        
228        if(wfName == null || wfName.trim().isEmpty()) {
229            handler.handle(Future.failedFuture("No wf_name specified."));
230            return;
231        }
232
233        if(user == null) {
234            handler.handle(Future.failedFuture("User is not authenticated"));
235            return;
236        }
237        
238        // TODO correct way to convert Buffer to JSON?
239        //System.out.println("recvd: " + json);
240       
241        // the following will block, e.g., waiting on the workspace lock,
242        // so execute in blocking threads.
243        _vertx.<JsonObject>executeBlocking(future -> {
244
245            //System.out.println("executeBlocking " + wfName);
246            
247            boolean runSynchronously = false;
248            ProvenanceRecorder recorder = null;
249            CompositeActor model = null;
250            
251            // see if workflow is already loaded
252            try {
253                model = _getModel(wfName);
254
255                if(model == null) {
256                    throw new Exception("Workflow not found.");
257                }
258                                    
259                try {
260                    _setModelParameters(model, json, user);
261                } catch (IllegalActionException e) {
262                    throw new Exception("Error setting parameters: " + e.getMessage());
263                }
264                
265                boolean recordProvenance = json.getBoolean("prov", true);
266                runSynchronously = json.getBoolean("sync", false);
267                
268                // cannot run async without provenance
269                if(!runSynchronously && !recordProvenance) {
270                    throw new Exception("Cannot execute workflow asynchronously without recording provenance.");
271                }
272                
273                recorder = ProvenanceRecorder.getDefaultProvenanceRecorder(model);
274
275                if(recordProvenance) {
276                    if(recorder == null) {                        
277                        if(!ProvenanceRecorder.addProvenanceRecorder(model, null, null)) {                            
278                            throw new Exception("Error adding provenance recorder to workflow.");
279                        }
280                        recorder = ProvenanceRecorder.getDefaultProvenanceRecorder(model);
281                        if(recorder == null) {
282                            throw new Exception("Cannot find provenance recorder in workflow after adding one.");
283                        }
284                    }                    
285                    
286                    //System.out.println("setting provenance username = " + user.principal().getString("username"));
287                    recorder.username.setToken(user.principal().getString("username"));
288                } else if (recorder != null) {
289                    recorder.setContainer(null);
290                }
291                
292                // create manager and add model
293                final Manager manager = new Manager(model.workspace(), "Manager");
294
295                try {
296                    model.setManager(manager);
297                } catch(IllegalActionException e) {
298                    throw new Exception("Error setting Manager for sub-workflow: " + e.getMessage());
299                }    
300                
301                String[] errorMessage = new String[1];
302
303                ExecutionListener managerListener = new ExecutionListener() {
304                    
305                    @Override
306                    public void executionError(Manager m, Throwable throwable) {
307                        System.err.println("Workflow execution error: " + throwable.getMessage());
308                        throwable.printStackTrace();
309                        errorMessage[0] = throwable.getMessage();
310                    }
311
312                    @Override
313                    public void executionFinished(Manager m) {
314                        //System.out.println("finished");
315                    }
316
317                    @Override
318                    public void managerStateChanged(Manager m) {                    
319                        //System.out.println("manager state changed " + m.getState().getDescription());
320                    }
321                };
322                
323                manager.addExecutionListener(managerListener);
324                
325                if(runSynchronously) {
326                    
327                    final String[] runLSIDStr = new String[1];
328                    if(recordProvenance) {
329                        recorder.addPiggyback(new Recording() {
330                            @Override
331                            public void executionStart(KeplerLSID lsid) {
332                                runLSIDStr[0] = lsid.toString();
333                            }
334                        });
335                    }
336
337                    final boolean[] timeout = new boolean[1];
338                    timeout[0] = false;
339                    long timerId = vertx.setTimer(_workflowTimeout, id -> {
340                        timeout[0] = true;
341                        manager.stop();
342                        future.fail("Execution timeout.");
343                    });
344                    
345
346                    // call execute() instead of run, otherwise exceptions
347                    // go to execution listener asynchronously.
348                    manager.execute();
349                    
350                    // see if we timed-out. if so, we already sent the response,
351                    // so exit.
352                    if(timeout[0]) {
353                        return;
354                    } else if(!vertx.cancelTimer(timerId)) {
355                        System.err.println("Workflow timeout Timer does not exist.");
356                    } else { System.out.println("cancelled timer."); }
357                    
358                    // see if there is a workflow exception
359                    if(errorMessage[0] != null) {
360                        future.fail(errorMessage[0]);
361                        return;
362                    }
363                                                                        
364                    JsonObject responseJson = new JsonObject();
365
366                    if(recordProvenance) {
367                        responseJson.put("id", runLSIDStr[0]);  
368                    }
369                    
370                    // build the response JSON object from the client buffers
371                    // for this workflow.
372                    JsonArray arrayJson = new JsonArray();
373                    List<JsonObject> outputs = getClientBuffer(model);
374                    if(outputs != null) {
375                        for(JsonObject outputJson : outputs) {
376                            if(outputJson.containsKey("actor")) {
377                                JsonObject actorObject = outputJson.getJsonObject("actor");
378                                for(String idStr : actorObject.fieldNames()) {
379                                    JsonObject idObject = actorObject.getJsonObject(idStr);
380                                    if(idObject.containsKey("data")) {
381                                        arrayJson.add(idObject.getJsonObject("data"));
382                                    }
383                                }
384                            }
385                        }
386                    }
387                    // send the successful response
388                    //System.out.println(arrayJson.encodePrettily());
389                    responseJson.put("responses", arrayJson);
390                    future.complete(responseJson);
391                } else { // asynchronous             
392                                        
393                    final CompositeActor finalModel = model;
394                    recorder.addPiggyback(new Recording() {
395                        @Override
396                        public void executionStart(KeplerLSID lsid) {
397                            //System.out.println("execution lsid is " + lsid);
398                            future.complete(new JsonObject().put("id", lsid.toString()));
399                        }
400                                                              
401                        @Override
402                        public void executionStop() {
403                            removeModel(finalModel);
404                        }
405                    });
406                    
407                    manager.startRun();
408                }
409            } catch (Exception e) {
410                future.fail("Error executing workflow: " + e.getMessage());
411                return;
412            } finally {
413                if(model != null && runSynchronously) {
414                    removeModel(model);
415                    model = null;
416                }
417            }
418            
419        // set ordered to false to allow parallel executions,
420        // and use handler for result.    
421        }, false, handler);
422    }
423    
424    /** Search for a file. The directories searched are the root directory (if specified),
425     *  and <module>/resources/html/.
426     *  @param name The name of the file to search for.
427     *  @return If the path is found, the absolute path of the file.
428     *  Otherwise, returns null.
429     */
430    public static String findFile(String name) {
431        
432        //System.out.println("findFile: " + name);
433        
434        if(_appendIndexHtml && name.endsWith("/")) {
435            name = name.concat("index.html");
436        }
437        
438        // search root dir first if specified
439        if(_rootDir != null) {
440            String fsPath = new StringBuilder(_rootDir)
441                .append(File.separator)
442                .append(name)
443                .toString();
444            // TODO check path is not outside _rootDir
445            if(_vertx.fileSystem().existsBlocking(fsPath)) {
446                return fsPath;
447            }
448        }
449        
450        // not found in root dir, so search module tree.
451        for(Module module : _moduleTree) {
452            String fsPath = new StringBuilder(module.getResourcesDir().getAbsolutePath())
453                .append(File.separator)
454                .append("html")
455                .append(File.separator)
456                .append(name).toString();
457            // TODO check path is not outside of resources/html dir
458            //System.out.println("checking " + fsPath);
459            if(_vertx.fileSystem().existsBlocking(fsPath)) {
460                return fsPath;
461            }
462        }
463        
464        // not found anywhere
465        return null;
466    }
467    
468    /** Get the client buffers for a workflow. */
469    public static List<JsonObject> getClientBuffer(NamedObj model) {
470        return _clientBuffers.get(model);
471    }
472
473    /** Called during initialization. */
474    public static void initialize() {
475                
476        int workerPoolSize = WebViewConfiguration.getHttpServerWorkerThreads();
477        VertxOptions options = new VertxOptions()
478                .setWorkerPoolSize(workerPoolSize)
479                .setMaxWorkerExecuteTime(Long.MAX_VALUE);
480
481        if(WebViewConfiguration.shouldStartClusterHazelcast()) {
482            Config hazelcastConfig = new Config();
483            ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
484            if (WebViewConfiguration.deployInKubernetes()) {
485                hazelcastConfig.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
486                hazelcastConfig.getNetworkConfig().getJoin().getKubernetesConfig().setEnabled(true)
487                        .setProperty("service-dns", WebViewConfiguration.getHazelcastDiscoveryDnsServiceName());
488            }
489            options.setClusterManager(mgr);
490            Vertx.clusteredVertx(options, result -> {
491                if(result.failed()) {
492                    MessageHandler.error("Failed to get vertx context.", result.cause());
493                    // TODO shutdown?
494                } else {
495                    _vertx = result.result();
496                    _initializeAfterCreatingVertx();
497                }
498            });
499        } else if(!WebViewConfiguration.shouldStartCluster()) {
500            _vertx = Vertx.vertx(options);
501            _initializeAfterCreatingVertx();
502        } else {
503            MessageHandler.error("Unknown value for cluster type in configuration.xml.");
504            Shutdown.shutdownLatch.countDown();
505        }
506                     
507    }
508    
509    private static void _initializeAfterCreatingVertx() {
510        
511                
512        String rootDir = WebViewConfiguration.getHttpServerRootDir();
513        if(rootDir != null &&
514            !rootDir.equals(WebViewConfiguration.DEFAULT_WEBVIEW_SERVER_ROOT_DIR)) {
515            _rootDir = rootDir;
516            System.out.println("Web view root dir is " + _rootDir);
517        }
518        
519        _appendIndexHtml = WebViewConfiguration.getHttpServerAppendIndexHtml();
520        
521        _workflowTimeout = WebViewConfiguration.getHttpServerWorkflowTimeout();
522        
523        // register as an updater to get window open and close events
524        KeplerGraphFrame.addUpdater(_updater);
525       
526        // start logging thread
527        _loggingThread = new Thread(new Runnable() {
528            @Override
529            public void run() {
530                String logPath = WebViewConfiguration.getHttpServerLogPath();
531                System.out.println("Web-view logging to " + logPath);
532                try(FileWriter writer = new FileWriter(logPath, true)) {
533                    String message;
534                    try {
535                        while((message = _loggingQueue.take()) != STOP_LOGGING_THREAD_MESSAGE) {
536                            writer.write(message);
537                            writer.write('\n');
538                            writer.flush();
539                        }
540                    } catch (InterruptedException e) {
541                        MessageHandler.error("Interrupted while waiting in logging thread.", e);
542                    }
543                } catch (IOException e) {
544                    MessageHandler.error("Error trying to write to web-view server log " +
545                            logPath, e);
546                }
547            }
548        });
549        _loggingThread.start();   
550        
551        
552        boolean daemon = WebViewConfiguration.shouldStartHttpServerAsDaemon();
553        
554        if(daemon) {
555            System.out.println("Loading MoML filters.");
556            
557            // We set the list of MoMLFilters to handle Backward Compatibility.
558            MoMLParser.setMoMLFilters(BackwardCompatibility.allFilters());
559            
560            // load the gui and cache configuration since we need the CacheManager
561            // to load the KAREntryHandlers for exporting provenance kars.
562            try {
563                ConfigurationApplication.readConfiguration(
564                    ConfigurationApplication.specToURL("ptolemy/configs/kepler/ConfigGUIAndCache.xml"));
565            } catch (Exception e) {
566                MessageHandler.error("Error creating Configuration.", e);
567            }
568        }
569
570        if(WebViewConfiguration.shouldStartHttpServer() || daemon) {
571
572            /* TODO still necessary?
573            List<URL> list = new LinkedList<URL>();
574            for(String path : System.getProperty("java.class.path").split(File.pathSeparator)) {
575                try {
576                    list.add(new File(path).toURI().toURL());
577                } catch (MalformedURLException e) {
578                    MessageHandler.error("Bad URL.", e);
579                }
580            }
581             */
582            // list.toArray(new URL[list.size()]),
583            
584            DeploymentOptions deploymentOptions = new DeploymentOptions()
585                    .setInstances(WebViewConfiguration.getHttpServerInstances());
586            
587            WebViewServer.vertx().deployVerticle(
588                WebViewServer.class.getName(),
589                deploymentOptions,
590                deployResult -> {
591                    if(deployResult.failed()) {
592                        MessageHandler.error("Failed to deploy web view server.", deployResult.cause());
593                        // TODO shut down kepler.
594                    }
595                }
596            );
597            
598            for(String model: WebViewConfiguration.getPreloadModels()) {
599                System.out.println("Preloading " + model);
600                try {
601                    if(_getModel(model) == null) {
602                        System.err.println("ERROR: Unable to find model for preload: " + model); 
603                    }
604                } catch (Exception e) {
605                    System.err.println("ERROR: Unable to preload " + model + ": " + e.getMessage());
606                }
607            }
608            
609            // TODO preload apps
610                        
611        } else {
612            // not starting server, so set latch to 0
613            Shutdown.shutdownLatch.countDown();
614        }
615
616    }
617    
618    /** Log an http server request.
619     *  @param request The request
620     *  @param user The user
621     *  @param status The status
622     *  @param timestamp The timestamp when the request occurred.
623     */
624    public void log(HttpServerRequest request, User user, int status, long timestamp) {
625        log(request, user, status, timestamp, -1);
626    }
627    
628    /** Log an http server request.
629     *  @param request The request
630     *  @param user The user
631     *  @param status The status
632     *  @param timestamp The timestamp when the request occurred.
633     *  @param length The length of the file successfully sent to the client.
634     */
635    public void log(HttpServerRequest request, User user, int status, long timestamp, long length) {
636        
637        String versionStr;
638        switch(request.version()) {
639        case HTTP_1_1:
640            versionStr = "HTTP/1.1";
641            break;
642        case HTTP_1_0:
643            versionStr = "HTTP/1.0";
644            break;
645        default:
646            versionStr = "-";
647            break;
648        }
649        
650        String hostString;
651        SocketAddress address = request.remoteAddress();
652        if(address == null) {
653            hostString = "-";
654        } else {
655            hostString = address.host();
656        }
657        
658        String referrer = request.headers().get("referrer");
659        if(referrer == null) {
660            referrer = "-";
661        }
662        
663        String userAgent = request.headers().get("user-agent");
664        if(userAgent == null) {
665            userAgent = "-";
666        }
667        
668        String userNameStr = "-";
669        if(user != null) {
670            //System.out.println(user);
671            userNameStr = user.principal().getString("username");
672        }
673                    
674        // TODO how to get user-identifier and userid?
675        String message = String.format("%s - %s [%s] \"%s %s %s\" %d %s \"%s\" \"%s\"",
676            hostString,
677            userNameStr,
678            _logTimeFormat.format(new Date(timestamp)),
679            request.method(),
680            request.uri(),
681            versionStr,
682            status,
683            length < 0 ? "-" : String.valueOf(length),
684            referrer,
685            userAgent);
686
687        try {
688            _loggingQueue.put(message);
689        } catch (InterruptedException e) {
690            MessageHandler.error("Interuppted adding to logging thread.", e);
691        }
692    }
693    
694    public void log(ServerWebSocket socket, String command, String path) {
695        log(socket, command, path, null);
696    }
697
698    public void log(ServerWebSocket socket, String command, String path, Buffer buffer) {
699        
700        final long timestamp = System.currentTimeMillis();
701        
702        String hostString;
703        SocketAddress address = socket.remoteAddress();
704        if(address == null) {
705            hostString = "-";
706        } else {
707            hostString = address.host();
708        }
709        
710        String referrer = socket.headers().get("referrer");
711        if(referrer == null) {
712            referrer = "-";
713        }
714        
715        String userAgent = socket.headers().get("user-agent");
716        if(userAgent == null) {
717            userAgent = "-";
718        }
719
720        String message = String.format("%s - - [%s] \"%s %s %s %s\" %s %s \"%s\" \"%s\"",
721            hostString,
722            _logTimeFormat.format(new Date(timestamp)),
723            command,
724            path,
725            buffer == null ? "-" : buffer.toString(),
726            "websocket",
727            "-", // status
728            "-", // length
729            referrer,
730            userAgent);
731
732        try {
733            _loggingQueue.put(message);
734        } catch (InterruptedException e) {
735            MessageHandler.error("Interuppted adding to logging thread.", e);
736        }
737    }
738
739    /** Start the server. */
740    @Override
741    public void start() throws Exception {
742        super.start();
743        
744        _auth = WebViewConfiguration.getAuthProvider();
745        
746        _queryable = ProvenanceRecorder.getDefaultQueryable(null);
747        if(_queryable == null) {
748            throw new Exception("Unable to get provenance queryable.");
749        }
750        
751        Router router = Router.router(vertx);
752
753        if(WebViewConfiguration.getHttpServerCorsEnabled()) {
754            String allowOriginPatternStr = WebViewConfiguration.getHttpServerCorsAllowOriginPattern();
755            if(allowOriginPatternStr == null || allowOriginPatternStr.trim().isEmpty()) {
756                throw new Exception("Must specify allow origini pattern for CORS.");
757            }
758            router.route().handler(CorsHandler.create(allowOriginPatternStr)
759                    .allowedMethod(HttpMethod.GET)
760                    .allowedMethod(HttpMethod.POST)
761                    .allowedMethod(HttpMethod.OPTIONS)
762                    .allowCredentials(true)
763                    .allowedHeader("Access-Control-Allow-Credentials")
764                    .allowedHeader("X-PINGARUNER")
765                    .allowedHeader("Content-Type")
766                    .allowedHeader("authorization"));
767            System.out.println("CORS enabled for " + allowOriginPatternStr);
768        }
769                
770        
771        // create cookie and session handlers
772        // authenticated users are cached in the session so authentication
773        // does not need to be performed for each request.
774        router.route().handler(CookieHandler.create());
775        
776        BodyHandler bodyHandler = BodyHandler.create();
777        
778        if(_rootDir != null) {
779            File uploadsDir = new File(_rootDir, "file-uploads");
780            if(!uploadsDir.exists()) {
781                if(!uploadsDir.mkdirs()) {
782                    throw new Exception("Unable to create file uploads directory " + uploadsDir);
783                }
784            }
785            bodyHandler.setUploadsDirectory(uploadsDir.getAbsolutePath());
786        }
787        
788        // NOTE: need to install the BodyHandler before any handlers that
789        // make asynchronous calls such as BasicAuthHandler.
790        router.route().handler(bodyHandler);
791        
792        SessionStore sessionStore;
793        if(WebViewConfiguration.shouldStartCluster()) {
794            sessionStore = ClusteredSessionStore.create(_vertx);
795        } else {
796            sessionStore = LocalSessionStore.create(_vertx);
797        }
798        SessionHandler sessionHandler = SessionHandler.create(sessionStore)
799            .setSessionCookieName("WebViewSession")
800            .setSessionTimeout(WebViewConfiguration.getHttpServerSessionTimeout());
801        router.route().handler(sessionHandler);
802        router.route().handler(UserSessionHandler.create(_auth));
803        
804        //System.out.println("session timeout = " + WebViewConfiguration.getHttpServerSessionTimeout());
805        
806        // use custom auth handler with custom auth scheme to prevent
807        // browsers (e.g., chrome) from opening auth dialog.
808        // FIXME read realm from conf file.        
809        AuthHandler authenticationHandler = new WebViewAuthHandlerImpl(_auth, "WebView");
810        
811        Handler<RoutingContext> authorizationHandler = new Handler<RoutingContext>() {
812            @Override
813            public void handle(RoutingContext c) {
814                if(c.user() == null) {
815                    c.next();
816                } else {
817                    c.user().isAuthorized("login", result -> {
818                        if(result.failed()) {
819                            c.response()
820                                .putHeader("Content-type", "text/plain")
821                                .setStatusCode(HttpURLConnection.HTTP_FORBIDDEN)
822                                .end(result.cause().getMessage());
823                        } else {
824                            c.next();
825                        }
826                    });
827                }
828            }            
829        };
830        
831        // everything under /kepler and /app needs to be
832        // authenticated and authorized.
833        router.route("/kepler/*")
834            .handler(authenticationHandler)
835            .handler(authorizationHandler);
836        router.route("/app")
837            .handler(authenticationHandler)
838            .handler(authorizationHandler);
839        
840        // TODO should these be under /kepler?
841        router.getWithRegex("^/wf/(\\d+)$").handler(new WorkflowHandler(this));
842        router.getWithRegex("^/wf/(\\d+(?:\\-\\d+)+)$").handler(new ActorHandler(this));
843        
844        RunWorkflowHandler runWorkflowHandler = new RunWorkflowHandler(this);
845        router.post("/kepler/runwf").handler(runWorkflowHandler::handleRunWorkflow);
846        router.post("/app").handler(runWorkflowHandler::handleRunApp);
847        
848        router.post("/kepler/runs").handler(new RunsHandler(this));
849        
850        RunIdHandler runIdHandler = new RunIdHandler(this);
851        
852        String runIdRegexStr = "(urn:lsid:[^:]+:[^:]+:[^:]+(?::\\d+)?)";         
853        router.getWithRegex("/kepler/runs/" + runIdRegexStr + "/(\\w+)").blockingHandler(runIdHandler::handleBinary);
854        router.getWithRegex("/kepler/runs/" + runIdRegexStr).handler(runIdHandler);
855        
856        if(WebViewConfiguration.getHttpServerMetadataFileName() != null) {
857            System.out.println("Metadata file set; requests at /login");
858            
859            router.route("/login")
860                .handler(authenticationHandler)            
861                .handler(authorizationHandler);
862            
863            LoginHandler loginHandler = new LoginHandler(this);
864            router.route("/login").handler(loginHandler);
865                        
866            // login session handler to check if session cookie is valid.
867            /*
868            router.route("/loginSession")
869                .handler(authorizationHandler)
870                .handler(loginSessionContext -> {
871                
872                System.out.println("session " + loginSessionContext.session().id());
873                
874                // if not valid, return 400
875                if(loginSessionContext.user() == null) {
876                    System.out.println("user is null");
877                    loginSessionContext.response()
878                        .putHeader("Content-Type", "text/plain")
879                        .setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST)
880                        .end();
881                } else {
882                    // otherwise call login handler to return metadata.
883                    loginSessionContext.next();
884                }
885            });
886            router.route("/loginSession").handler(loginHandler);
887            */
888            
889            // logout handler to remove session and user.
890            router.route("/logout").handler(authenticationHandler);
891            // FIXME what about authorization handler?
892            router.route("/logout").handler(logoutContext -> {
893                //System.out.println("destroying session " + logoutContext.session().id());
894                logoutContext.session().destroy();
895                logoutContext.clearUser();
896                logoutContext.response()
897                    .putHeader("Content-Type", "text/plain")
898                    .end();
899            });
900        }
901
902        if(WebViewConfiguration.enableHttpServerTableOfContents()) {
903            String path = WebViewConfiguration.getHttpServerTableOfContentsPath();
904            if(path == null) {
905                path = "^(?:/kepler/toc){0,1}/*$";
906            }
907            System.out.println("Enabling http server table of contents at " + path);
908            router.getWithRegex(path).handler(new TableOfContentsHandler(this));
909        }
910        
911        // add route that handles anything unmatched
912        router.route().handler(new NoMatchHandler(this));
913                
914        if(WebViewConfiguration.enableHttps()) {
915            
916            sessionHandler.setCookieSecureFlag(true);
917            
918            String pemKeyStr = WebViewConfiguration.getHttpsPEMKeyPath();
919            if(pemKeyStr == null) {
920                throw new Exception("Must specify PEM key file for HTTPS server.");
921            }
922            
923            String pemCertStr = WebViewConfiguration.getHttpsPEMCertPath();
924            if(pemCertStr == null) {
925                throw new Exception("Must specify PEM certificate file for HTTPS server.");                
926            }
927            
928            int securePort = WebViewConfiguration.getHttpsServerPort();
929            
930            // FIXME what if pemCert or pemKey do not exist?
931            
932            HttpServerOptions options = new HttpServerOptions()
933                .setSsl(true)
934                .setPemKeyCertOptions(new PemKeyCertOptions()
935                    .setCertPath(pemCertStr)
936                    .setKeyPath(pemKeyStr))
937                .setPort(securePort);
938
939            System.out.println("Starting secure web server on port " + securePort);
940            
941            _secureServer = vertx.createHttpServer(options)
942                    .websocketHandler(new WorkflowWebSocketHandler(this))
943                    .requestHandler(router::accept)
944                    .listen();
945           
946            if(WebViewConfiguration.shouldHttpServerRedirect()) {
947
948                int status = WebViewConfiguration.getHttpServerRedirectStatus();
949                String hostname = WebViewConfiguration.getHttpServerRedirectHostname();
950                int redirectFromPort = WebViewConfiguration.getHttpServerPort();
951                int redirectToPort = WebViewConfiguration.getHttpServerRedirectPort();
952                
953                String redirectUrl = "https://" + hostname + ":" + redirectToPort;
954                
955                System.out.println("Redirecting web view server port " + redirectFromPort +
956                    " to " + redirectUrl + " with status " + status);
957                
958                _server = vertx.createHttpServer()
959                    .requestHandler(req -> {
960                        req.response().headers().set("Location", redirectUrl + req.path());                        
961                        req.response()
962                            .setStatusCode(status)
963                            .end();
964                    })
965                    .listen(redirectFromPort);
966            }
967
968        } else {
969        
970            int port = WebViewConfiguration.getHttpServerPort();
971            
972            System.out.println("Starting web view server on port " + port);
973
974            _server = vertx.createHttpServer()
975                .websocketHandler(new WorkflowWebSocketHandler(this))
976                .requestHandler(router::accept)
977                .listen(port);
978        }
979    }
980        
981    public static void removeModel(NamedObj model) {
982
983        // notify any clients that the workflow was closed.            
984        try {
985            WebViewableUtilities.sendEvent(
986                WebViewableUtilities.Event.WorkflowClosed, model);
987        } catch (IllegalActionException e) {
988            MessageHandler.error("Error notifying clients that workflow closed.", e);
989        }
990        
991        WebViewId.removeWorkflow(model);
992        
993        //System.gc();
994        _clientBuffers.remove(model);
995        
996        //System.out.println("removing " + model + " " + model.hashCode());
997        /*
998        for(CompositeActor master : _models.values()) {
999            System.out.println("master " + master + " " + master.hashCode());
1000        }
1001        */
1002        
1003        //System.out.println("remove " + model.hashCode());
1004
1005        //_models.remove(model);
1006        
1007        //System.out.println("removed workflow " + model.getName());
1008    }
1009    
1010    /** Called during shutdown. */
1011    public static void shutdown() {
1012                
1013        KeplerGraphFrame.removeUpdater(_updater);
1014            
1015        for(CompositeActor model : _models.values()) {
1016            removeModel(model);
1017        }
1018        
1019        _models.clear();
1020        
1021        for(App app : _apps.values()) {
1022            app.close();
1023        }
1024        
1025        _apps.clear();
1026        
1027        // stop the logging thread
1028        _loggingQueue.add(STOP_LOGGING_THREAD_MESSAGE);
1029        try {
1030            _loggingThread.join(5000);
1031        } catch (InterruptedException e) {
1032            MessageHandler.error("Interrupted while waiting for logging thread to stop.", e);
1033        }
1034        _loggingThread = null;
1035    }
1036    
1037    /** Stop the server. */
1038    @Override
1039    public void stop() throws Exception {
1040        super.stop();
1041        
1042        if(_server != null) {
1043            //System.out.println("Stopping vertx web server.");
1044            _server.close();
1045            _server = null;
1046        }
1047        
1048        
1049        if(_secureServer != null) {
1050            _secureServer.close();
1051            _secureServer = null;
1052        }
1053    }
1054        
1055    public static Vertx vertx() {
1056        return _vertx;
1057    }
1058
1059    private static class ModelUpdater implements KeplerGraphFrameUpdater {
1060
1061        @Override
1062        public int compareTo(KeplerGraphFrameUpdater o) {
1063            return 1;
1064        }
1065
1066        @Override
1067        public void updateFrameComponents(Components components) {
1068            try {
1069                CompositeActor model = (CompositeActor) components.getFrame().getModel();
1070                _addModel(model);
1071                _models.put(model.getName(), model);
1072            } catch (Exception e) {
1073                MessageHandler.error("Error adding model from UI.", e);
1074            }
1075
1076        }
1077
1078        /** Remove the model associated with the frame from the web server. */
1079        @Override
1080        public void dispose(KeplerGraphFrame frame) {
1081            CompositeActor model = (CompositeActor) frame.getModel();
1082            removeModel(model);
1083            _models.remove(model.getName());
1084        }
1085        
1086    }
1087            
1088    ///////////////////////////////////////////////////////////////////
1089    ////                         public variables                  ////
1090    
1091    public final static String WS_PATH = "/ws/";
1092    public final static String WS_RUNWF_PATH = "/ws-runwf";
1093
1094    ///////////////////////////////////////////////////////////////////
1095    ////                         package-protected variables       ////
1096
1097
1098    ///////////////////////////////////////////////////////////////////
1099    ////                         private methods                   ////
1100
1101    /** Add a model to the set managed by the server. */
1102    public static void _addModel(CompositeActor model) throws Exception {
1103        
1104        //System.out.println("adding model " + model.getFullName());
1105
1106        //_models.add(model);
1107
1108        //System.out.println("put " + model.hashCode());
1109        _clientBuffers.put(model,  new LinkedList<JsonObject>());
1110        
1111        /*
1112        ProvenanceRecorder recorder = ProvenanceRecorder.getDefaultProvenanceRecorder(model);
1113        if(recorder != null) {
1114            recorder.addPiggyback(new WebViewRecording());
1115        }
1116        */
1117
1118        if(model.attributeList(ControlAttribute.class).isEmpty()) {
1119            ControlAttribute control = new ControlAttribute(model, "_wvControl");
1120            control.setPersistent(false);
1121        }
1122        
1123        if(model.attributeList(ParametersAttribute.class).isEmpty()) {
1124            ParametersAttribute parameters = new ParametersAttribute(model, "_wvParameters");
1125            parameters.setPersistent(false);
1126        }
1127        
1128        /*
1129        WebViewAttribute timeline = (WebViewAttribute) model.getAttribute("_wfTimeline");
1130        if(timeline == null) {
1131            timeline = new WebViewAttribute(model, "_wfTimeline");
1132            timeline.htmlFile.setExpression("web-view/visjs/wf-timeline.html");
1133            timeline.title.setExpression("Workflow Execution Timeline");
1134        }
1135        timeline.setPersistent(false);
1136        */
1137    }
1138    
1139    /** Get a clone of a model. */
1140    private static CompositeActor _cloneModel(CompositeActor masterModel) throws CloneNotSupportedException {
1141        CompositeActor model = (CompositeActor)masterModel.clone(new Workspace());
1142        _clientBuffers.put(model,  new LinkedList<JsonObject>());
1143        //System.out.println("put " + model.hashCode());
1144        return model;
1145    }
1146
1147    /** Convert from a JSON object value to a string representation of a Ptolemy token. */
1148    private static String _convertJSONToTokenString(Object value) {
1149        if(value instanceof JsonArray) {
1150            StringBuilder buf = new StringBuilder("{");
1151            Iterator<Object> iter = ((JsonArray)value).iterator();
1152            while(iter.hasNext()) {
1153                Object item = iter.next();
1154                // strings need to be surrounded by quotes in arrays
1155                if(item instanceof String) {
1156                    buf.append("\"")
1157                        .append(item)
1158                        .append("\"");
1159                } else {
1160                    // item may be a complex type (e.g., record token)
1161                    buf.append(_convertJSONToTokenString(item));
1162                }
1163                if(iter.hasNext()) {
1164                    buf.append(",");
1165                }
1166            }
1167            return buf.append("}").toString();
1168        } else if(value instanceof JsonObject) {
1169            // convert json objects to ptolemy record strings.
1170            StringBuilder buf = new StringBuilder("{");
1171            Iterator<Entry<String, Object>> iter = ((JsonObject)value).iterator();
1172            while(iter.hasNext()) {
1173                Entry<String,Object> entry = iter.next();
1174                buf.append(entry.getKey())
1175                    .append("=")
1176                    .append(_convertJSONToTokenString(entry.getValue()));
1177                if(iter.hasNext()) {
1178                    buf.append(",");
1179                }
1180            };            
1181            return buf.append("}").toString();
1182        } else {
1183            // FIXME if value is a string and parameter is not in string mode,
1184            // will this work?
1185            return value.toString();
1186        }
1187    }
1188    
1189    @SuppressWarnings("resource")
1190    private App _getApp(String name) throws Exception {
1191        
1192        App masterApp = null;
1193        synchronized(_apps) {            
1194            // see if we've already instantiated this app
1195            masterApp = _apps.get(name);
1196            if(masterApp == null) {
1197            
1198                String className = WebViewConfiguration.getAppClassName(name);
1199                
1200                if(className == null || className.trim().isEmpty()) {
1201                    throw new Exception("Missing class name for app " + name);
1202                }
1203                
1204                try {
1205                    Class<?> appClass = Class.forName(className);
1206                    masterApp = (App) appClass.newInstance();
1207                    _apps.put(name, masterApp);
1208                } catch(ClassNotFoundException | InstantiationException
1209                    | IllegalAccessException e) {
1210                    throw new Exception("Error instantiating app " + name + ": " + e.getMessage());
1211                }    
1212            }
1213        }        
1214        return (App) masterApp.clone();
1215    }
1216
1217    /** Get the model for a specific name. If the model is not already
1218     *  loaded, then use findFile() to find the model file with the
1219     *  given name, name.kar and name.xml. Otherwise, return a copy
1220     *  of the already-loaded model.
1221     *  @return If a model with the specified name is loaded or can be
1222     *  found and parsed, returns the model. Otherwise, null.
1223     */
1224    private static CompositeActor _getModel(String name) throws Exception {
1225                
1226        //System.out.println("size = " + _models.size());
1227        
1228        synchronized(_models) {
1229            
1230            // see if the model is already loaded.
1231            CompositeActor masterModel = _models.get(name);
1232            if(masterModel != null) {
1233                //System.out.println("found master model.");
1234                return _cloneModel(masterModel);
1235            }
1236                    
1237            // if not, find workflow file and load
1238            String wfFileStr = findFile(name);
1239            if(wfFileStr == null) {
1240                wfFileStr = findFile(name + ".kar");
1241                if(wfFileStr == null) {
1242                    wfFileStr = findFile(name + ".xml");
1243                    if(wfFileStr == null) {
1244                        return null;
1245                    }
1246                }
1247            }
1248            
1249            masterModel = (CompositeActor) ParseWorkflow.parseKAR(new File(wfFileStr), true);
1250            _addModel(masterModel);
1251            _models.put(name, masterModel);
1252            //System.out.println("adding master model " + name);
1253            
1254            // see if there is a configuration file for parameters
1255            File wfFile = new File(wfFileStr);
1256            File wfDir = wfFile.getParentFile();
1257            File wfConfFile = new File(wfDir, 
1258                FilenameUtils.getBaseName(wfFileStr) + ".conf");
1259            if(wfConfFile.exists()) {
1260                ParseWorkflowUtil.setParametersFromFile(masterModel, wfConfFile.getAbsolutePath());
1261            }
1262            
1263            return _cloneModel(masterModel);
1264        }
1265    }
1266        
1267    /** Set any parameters for a model from a JSON object. */
1268    private static void _setModelParameters(NamedObj model, JsonObject json,
1269        User user) throws IllegalActionException {
1270        
1271        // get any parameters
1272        JsonObject params = null;
1273        if(json.containsKey("wf_param")) {
1274            try {
1275                params = json.getJsonObject("wf_param");
1276            } catch(ClassCastException e) {
1277                throw new IllegalActionException("wf_param must be a json object.");
1278            }
1279        
1280            for(Map.Entry<String, Object> entry: params) {
1281                final String paramName = entry.getKey();
1282                //System.out.println("attempting to set parameter " + paramName);
1283                Attribute attribute = model.getAttribute(paramName);
1284                if(attribute == null || !(attribute instanceof Settable)) {
1285                    throw new IllegalActionException("Workflow does not have settable parameter " + paramName);
1286                }
1287    
1288                try {
1289                    String valueStr = _convertJSONToTokenString(entry.getValue());
1290                    ((Settable)attribute).setExpression(valueStr);
1291                } catch (IllegalActionException e) {
1292                    throw new IllegalActionException("Error settings parameter " +
1293                                paramName + ": " + e.getMessage());
1294                }
1295            }
1296        }
1297                
1298
1299        // set user full name if present
1300        Attribute attribute = model.getAttribute("WebView_FullName");
1301        if(attribute != null) {
1302            if(!(attribute instanceof Settable)) {
1303                System.err.println("WARNING: WebView_FullName parameter is not settable.");
1304            } else {
1305                ((Settable)attribute).setExpression(user.principal().getString("fullname"));
1306            }
1307            attribute = null;
1308        }
1309        
1310        // set the authorization groups parameter if present        
1311        attribute = model.getAttribute("WebView_groups");
1312        if(attribute != null) {
1313            if(!(attribute instanceof Settable)) {
1314                System.err.println("WARNING: WebView_group parameter is not settable.");
1315            } else {
1316                // add quotes around each string since we use this to set the
1317                // WebView_groups parameter in the workflow.
1318                String authGroupsStr = AuthUtilities.getGroups(user).stream()
1319                    .map(s -> "\"" + s + "\"").collect(Collectors.joining(","));
1320                //System.out.println(authGroupsStr);
1321                ((Settable)attribute).setExpression("{" + authGroupsStr + "}");
1322            }
1323        }
1324
1325    }
1326        
1327    ///////////////////////////////////////////////////////////////////
1328    ////                         private variables                 ////
1329
1330    private Queryable _queryable;
1331    
1332    /** Authentication provider. */
1333    private AuthProvider _auth;
1334    
1335    /** HTTP server listening for HTTP and WS requests. */
1336    private HttpServer _server;
1337    
1338    /** HTTPS server listening for HTTPS and WSS requests. */
1339    private HttpServer _secureServer;
1340        
1341    /** ModelUpdater used to add/remove workflows when KeplerGraphFrames
1342     *  are opened/closed.
1343     */
1344    private final static ModelUpdater _updater = new ModelUpdater();
1345    
1346    /** The module tree. */
1347    private final static ModuleTree _moduleTree = ModuleTree.instance();
1348    
1349    /** The root directory to search for files. If not set, the value is null. */
1350    private static String _rootDir;
1351
1352    // TODO need better name
1353    private final static Map<NamedObj,List<JsonObject>> _clientBuffers =
1354            Collections.synchronizedMap(new HashMap<NamedObj,List<JsonObject>>());
1355
1356    public final static Map<String,CompositeActor> _models
1357        = Collections.synchronizedMap(new HashMap<String,CompositeActor>());
1358
1359    public final static Map<String,App> _apps
1360        = Collections.synchronizedMap(new HashMap<String,App>());
1361    
1362    /* Timestamp format for logging. */
1363    private final DateFormat _logTimeFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z");
1364    
1365    private static Thread _loggingThread;
1366    
1367    private final static String STOP_LOGGING_THREAD_MESSAGE = "STOP";
1368    
1369    private final static LinkedBlockingQueue<String> _loggingQueue = new LinkedBlockingQueue<String>();
1370    
1371    /** If true, append index.html to all (unmatched) GETs ending with "/". */
1372    private static boolean _appendIndexHtml;
1373        
1374    private static Vertx _vertx;
1375    
1376    private static long _workflowTimeout;
1377}