001/* 002 * 003 * Copyright (c) 2015 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2017-08-23 22:44:42 -0700 (Wed, 23 Aug 2017) $' 008 * '$Revision: 1376 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.webview.server.handler; 031 032import java.util.List; 033 034import org.kepler.webview.server.WebViewId; 035import org.kepler.webview.server.WebViewServer; 036 037import io.vertx.core.http.ServerWebSocket; 038import io.vertx.core.json.JsonObject; 039import io.vertx.core.shareddata.LocalMap; 040import ptolemy.actor.TypedCompositeActor; 041 042public class WorkflowWebSocketHandler extends WebSocketServerBaseHandler { 043 044 public WorkflowWebSocketHandler(WebViewServer server) { 045 super(server); 046 } 047 048 @Override 049 public void handle(final ServerWebSocket ws) { 050 boolean handled = false; 051 final String path = ws.path(); 052 if(path.equals(WebViewServer.WS_RUNWF_PATH)) { 053 handled = true; 054 055 _server.log(ws, "OPEN", WebViewServer.WS_RUNWF_PATH); 056 057 ws.closeHandler(handler -> { 058 // TODO stop and close running workflow. 059 _server.log(ws, "CLOSE", WebViewServer.WS_RUNWF_PATH); 060 }).handler(buffer -> { 061 _server.log(ws, "READ", WebViewServer.WS_RUNWF_PATH, buffer); 062 063 JsonObject requestJson = new JsonObject(buffer.toString()); 064 065 // FIXME need to authenticate and get user object. 066 _server.executeWorkflow(requestJson, null, res -> { 067 JsonObject responseJson; 068 069 if(res.succeeded()) { 070 responseJson = res.result(); 071 } else { 072 responseJson = new JsonObject().put("error", res.cause().getMessage()); 073 } 074 075 Object requestId = requestJson.getValue("reqid"); 076 if(requestId != null) { 077 responseJson.put("reqid", requestId); 078 } 079 080 ws.writeFinalTextFrame(responseJson.encode()); 081 082 _server.log(ws, "WRITE", WebViewServer.WS_RUNWF_PATH); 083 084 }); 085 }); 086 087 } else if(path.startsWith(WebViewServer.WS_PATH)) { 088 089 System.out.println("ws path is: " + path); 090 091 final String id = path.substring(WebViewServer.WS_PATH.length()); 092 TypedCompositeActor model = (TypedCompositeActor) WebViewId.getNamedObj(id); 093 094 if(model == null) { 095 System.err.println("Unhandled websocket request " + 096 "(workflow not found) for: " + path); 097 } else { 098 099 handled = true; 100 101 final List<JsonObject> buffer = WebViewServer.getClientBuffer(model); 102 103 // synchronize on the buffer so we can send any buffered 104 // data to the client before new data arrives. 105 synchronized(buffer) { 106 final LocalMap<String,Integer> map = _server.getVertx().sharedData().getLocalMap(path); 107 108 System.out.println("new connection for " + path + " id = " + ws.textHandlerID()); 109 for(String id2 : map.keySet()) { 110 System.out.println("existing id: " + id2); 111 } 112 113 map.put(ws.textHandlerID(), Integer.valueOf(1)); 114 ws.closeHandler(handler -> { 115 System.out.println("closing connection for " + path + " id = " + ws.textHandlerID()); 116 map.remove(ws.textHandlerID()); 117 118 // notify webviewables about closed connection 119 JsonObject jsonEvent = new JsonObject().put("event", 120 new JsonObject() 121 .put("type", "conclose") 122 .put("id", ws.textHandlerID())); 123 _server.getVertx().eventBus().publish(path, jsonEvent); 124 }).handler(b -> { 125 // TODO correct way to convert Buffer to JSON? 126 JsonObject json = new JsonObject(b.toString()); 127 System.out.println("recvd for " + path + ": " + json); 128 _server.getVertx().eventBus().publish(path, json); 129 }); 130 131 // notify webviewables about new connection 132 JsonObject jsonEvent = new JsonObject().put("event", 133 new JsonObject() 134 .put("type", "conopen") 135 .put("id", ws.textHandlerID())); 136 _server.getVertx().eventBus().publish(path, jsonEvent); 137 138 139 // FIXME buffered json can contain incorrect actor 140 // paths, e.g., after a rename occurs. 141 // send any buffered data 142 /* 143 for(JsonObject json : buffer) { 144 //System.out.println("writing buffered: " + json.encode()); 145 //TODO re-enable ws.writeTextFrame(json.encode()); 146 } 147 */ 148 } 149 } 150 } 151 152 if(!handled) { 153 ws.reject(); 154 } 155 } 156}