001/* 002 * Copyright (c) 2015-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2017-08-29 15:27:08 -0700 (Tue, 29 Aug 2017) $' 007 * '$Revision: 1392 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.webview.actor; 031 032import java.util.HashMap; 033import java.util.Map; 034import java.util.concurrent.BlockingQueue; 035import java.util.concurrent.LinkedBlockingQueue; 036 037import org.kepler.webview.data.TokenConverter; 038 039import ptolemy.actor.TypedAtomicActor; 040import ptolemy.actor.TypedIOPort; 041import ptolemy.data.Token; 042import ptolemy.data.type.BaseType; 043import ptolemy.data.type.Type; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.Workspace; 048 049public class WebView extends TypedAtomicActor { 050 051 /** Create a new WebView in a container with the specified name. */ 052 public WebView(CompositeEntity container, String name) 053 throws IllegalActionException, NameDuplicationException { 054 super(container, name); 055 056 webView = new WebViewAttribute(this, "webView"); 057 webView.title.setToken(name); 058 } 059 060 /** Clone the actor into the specified workspace. 061 * @param workspace The workspace for the new object. 062 * @return A new actor. 063 * @exception CloneNotSupportedException If a derived class contains 064 * an attribute that cannot be cloned. 065 */ 066 @Override 067 public Object clone(Workspace workspace) throws CloneNotSupportedException { 068 WebView newObject = (WebView) super.clone(workspace); 069 newObject._outputQueues = new HashMap<TypedIOPort,BlockingQueue<Data>>(); 070 newObject._tokenConverter = new TokenConverter(); 071 return newObject; 072 } 073 074 public void dataReceived(String name, Object value) { 075 TypedIOPort port = (TypedIOPort) getPort(name); 076 if(port != null) { 077 BlockingQueue<Data> queue = _outputQueues.get(port); 078 if(queue == null) { 079 System.err.println("WARNING: dropping received data for " + 080 port.getFullName() + 081 "; is workflow running?"); 082 } else { 083 queue.add(new Data(value.toString())); 084 } 085 } else { 086 System.err.println("WARNING: dropping received data since no output port called " + name); 087 } 088 } 089 090 @Override 091 public void fire() throws IllegalActionException { 092 093 super.fire(); 094 095 // read all connected input ports 096 for(TypedIOPort port: inputPortList()) { 097 if(port.numberOfSources() > 0) { 098 /*Token token =*/ port.get(0); 099 } 100 } 101 102 // read from client for each output port 103 for(TypedIOPort port: outputPortList()) { 104 BlockingQueue<Data> queue = _outputQueues.get(port); 105 Data data; 106 try { 107 data = queue.take(); 108 } catch (InterruptedException e) { 109 throw new IllegalActionException(this, e, "Error waiting for output."); 110 } 111 if(data != DATA_STOP) { 112 Token token = _convertOutputToToken(data.value, port.getType()); 113 port.broadcast(token); 114 } 115 } 116 } 117 118 /** Perform initializations during workflow start up. */ 119 @Override 120 public void preinitialize() throws IllegalActionException { 121 122 super.preinitialize(); 123 124 _outputQueues.clear(); 125 for(TypedIOPort port: outputPortList()) { 126 _outputQueues.put(port, new LinkedBlockingQueue<Data>()); 127 128 // set default output type to be string; can be overridden 129 // in derived classes. 130 if(port.getType().equals(BaseType.UNKNOWN)) { 131 port.setTypeEquals(BaseType.STRING); 132 } 133 } 134 } 135 136 @Override 137 public void stop() { 138 super.stop(); 139 //System.out.println("stop"); 140 for(TypedIOPort port: outputPortList()) { 141 BlockingQueue<Data> queue = _outputQueues.get(port); 142 queue.add(DATA_STOP); 143 } 144 } 145 146 /////////////////////////////////////////////////////////////////// 147 //// public fields //// 148 149 public WebViewAttribute webView; 150 151 /////////////////////////////////////////////////////////////////// 152 //// protected methods //// 153 154 protected Token _convertOutputToToken(String data, Type type) throws IllegalActionException { 155 return _tokenConverter.convertToToken(data, type); 156 } 157 158 /////////////////////////////////////////////////////////////////// 159 //// private methods //// 160 161 /** A class to encapsulate data received from the web clients. */ 162 private static class Data { 163 public Data(String v) { 164 this.value = v; 165 } 166 public String value; 167 } 168 169 private Map<TypedIOPort,BlockingQueue<Data>> _outputQueues = 170 new HashMap<TypedIOPort,BlockingQueue<Data>>(); 171 172 /** A specific Data instance used when the workflow is stopped. */ 173 private final static Data DATA_STOP = new Data(""); 174 175 private TokenConverter _tokenConverter = new TokenConverter(); 176}