001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 18:53:35 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34661 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Queue; 038import java.util.concurrent.ConcurrentLinkedQueue; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041 042import org.apache.spark.sql.Dataset; 043import org.apache.spark.sql.Row; 044import org.apache.spark.sql.RowFactory; 045import org.apache.spark.sql.types.DataTypes; 046import org.apache.spark.sql.types.StructField; 047import org.apache.spark.sql.types.StructType; 048import org.kepler.spark.actor.SparkSQLActor; 049import org.kepler.webview.server.WebViewServer; // use webview's vertx instance 050 051import io.vertx.core.Vertx; 052import io.vertx.core.http.HttpClient; 053import io.vertx.core.http.HttpClientOptions; 054import io.vertx.core.http.WebSocket; 055import io.vertx.core.json.JsonObject; 056import ptolemy.actor.Actor; 057import ptolemy.actor.TypedIOPort; 058import ptolemy.actor.parameters.PortParameter; 059import ptolemy.data.IntToken; 060import ptolemy.data.LongToken; 061import ptolemy.data.ObjectToken; 062import ptolemy.data.StringToken; 063import ptolemy.data.Token; 064import ptolemy.data.expr.Parameter; 065import ptolemy.data.expr.StringParameter; 066import ptolemy.data.type.BaseType; 067import ptolemy.data.type.ObjectType; 068import ptolemy.kernel.CompositeEntity; 069import ptolemy.kernel.util.Attribute; 070import ptolemy.kernel.util.ChangeRequest; 071import ptolemy.kernel.util.IllegalActionException; 072import ptolemy.kernel.util.NameDuplicationException; 073import ptolemy.kernel.util.NamedObj; 074import ptolemy.kernel.util.SingletonAttribute; 075 076/** 077 * Actor for retrieving weather station data from the Firemap web socket server. 078 */ 079public class GetLiveMeasurements extends SparkSQLActor 080{ 081 protected Vertx vertx; 082 083 public StringParameter websocketHost; // firemap.sdsc.edu 084 public StringParameter websocketPort; // 8443; 085 public StringParameter websocketUri; // /livewx/websocket 086 public StringParameter messageType; 087 public StringParameter dropCorruptedData; 088 public Parameter minFiringInterval; 089 public Parameter firingQueueThreshold; 090 private static String WEBSOCKET_HOST; 091 private static int WEBSOCKET_PORT; 092 private static String WEBSOCKET_URI; 093 094 private final long DEFAULT_FIRING_INTERVAL = 60000; 095 private final int DEFAULT_QUEUE_THRS = 1; 096 097 public PortParameter observables; 098 public TypedIOPort output; 099 public TypedIOPort count; 100 101 private WebSocket _websocket = null; 102 public String[] _observables = null; 103 104 private String _messageTypeValue; 105 private StructType _schema; 106 107 // A queue that holds data samples yet to be output by the actor. 108 // _handleMessage() enqueues incoming websocket data, 109 // fire() dequeues and outputs a dataset 110 private Queue<Message> _queue; 111 112 // long for storing timestamp of last actor firing 113 // Used for timing GetLiveMeasurement's firing. 114 private long timeOfLastFire; 115 116 // if true, sets any incoming measurement value containing "#" to null. 117 private boolean dropCorrupted; 118 119 /** 120 * Initializes the actor's parameters and Vertex property. 121 * 122 * @param container 123 * @param name 124 * @throws IllegalActionException 125 * @throws NameDuplicationException 126 */ 127 public GetLiveMeasurements(CompositeEntity container, String name) 128 throws IllegalActionException, NameDuplicationException 129 { 130 super(container, name); 131 132 messageType = new StringParameter(this, "messageType"); 133 messageType.setExpression("R0"); 134 messageType.addChoice("R0"); 135 messageType.addChoice("R1"); 136 messageType.addChoice("R2"); 137 messageType.addChoice("R3"); 138 messageType.addChoice("R5"); 139 140 observables = new PortParameter(this, "observables"); 141 observables.setStringMode(true); 142 observables.setTypeEquals(BaseType.STRING); 143 observables.getPort().setTypeEquals(BaseType.STRING); 144 new Attribute(observables.getPort(), "_showName"); 145 146 websocketHost = new StringParameter(this, "websocketHost"); 147 websocketHost.setExpression("firemap.sdsc.edu"); 148 149 websocketPort = new StringParameter(this, "websocketPort"); 150 websocketPort.setExpression("5446"); 151 152 websocketUri = new StringParameter(this, "websocketUri"); 153 websocketUri.setExpression(""); 154 155 dropCorruptedData = new StringParameter(this, "dropCorruptedData"); 156 dropCorruptedData.setExpression("true"); 157 158 minFiringInterval = new Parameter(this, "minFiringInterval"); 159 minFiringInterval.setTypeEquals(BaseType.LONG); 160 minFiringInterval.setToken(new LongToken(DEFAULT_FIRING_INTERVAL)); 161 162 firingQueueThreshold = new Parameter(this, "firingQueueThreshold"); 163 firingQueueThreshold.setTypeEquals(BaseType.INT); 164 firingQueueThreshold.setToken(new IntToken(DEFAULT_QUEUE_THRS)); 165 166 output = new TypedIOPort(this, "output", false, true); 167 output.setTypeEquals(new ObjectType(Dataset.class)); 168 new SingletonAttribute(output, "_showName"); 169 170 count = new TypedIOPort(this, "count", false, true); 171 count.setTypeEquals(BaseType.LONG); 172 new SingletonAttribute(count, "_showName"); 173 } 174 175 /** 176 * Handles changing the message type parameter, and validates it. 177 * 178 * @param attribute The attribute that changed. 179 * @throws IllegalActionException 180 */ 181 @Override 182 public void attributeChanged(Attribute attribute) 183 throws IllegalActionException 184 { 185 if(attribute == messageType) { 186 Token token = messageType.getToken(); 187 188 if(token != null) { 189 String value = ((StringToken) token).stringValue(); 190 191 if(value != null && !value.trim().isEmpty()) { 192 String[] choices = messageType.getChoices(); 193 194 if(!Arrays.asList(choices).contains(value)) { 195 throw new IllegalActionException(this, 196 "Unsupported messageType: " + value); 197 } 198 199 _messageTypeValue = value; 200 } 201 } 202 } 203 else { 204 super.attributeChanged(attribute); 205 } 206 } 207 208 209 /** 210 * Call this to tell DE director to fire the actor. 211 * 212 * @throws IllegalActionException 213 */ 214 private void _fireActor() throws IllegalActionException 215 { 216 final Actor actor = this; 217 final NamedObj namedObj = output; 218 ChangeRequest request = new ChangeRequest(this, "fire " + getName()) 219 { 220 protected void _execute() throws Exception { 221 getDirector().fireAtCurrentTime(actor); 222 } 223 224 @Override 225 public NamedObj getLocality() { 226 return namedObj; 227 } 228 229 }; 230 request.setPersistent(false); 231 getContainer().requestChange(request); 232 } 233 234 235 /** 236 * Sets up the websocket and other member variables once per workflow 237 * execution. 238 * 239 * @throws IllegalActionException 240 */ 241 @Override 242 public void initialize() 243 throws IllegalActionException 244 { 245 super.initialize(); 246 247 // Get the websocket parameters, store them in member variables 248 WEBSOCKET_HOST = websocketHost.getValueAsString(); 249 WEBSOCKET_PORT = Integer.parseInt(websocketPort.getValueAsString()); 250 WEBSOCKET_URI = websocketUri.getValueAsString(); 251 252 // Set options for websocket 253 //vertx = Vertx.vertx(); 254 vertx = WebViewServer.vertx(); 255 HttpClientOptions options = new HttpClientOptions(); 256 options.setDefaultHost(WEBSOCKET_HOST); 257 options.setDefaultPort(WEBSOCKET_PORT); 258 options.setSsl(true); 259 options.setTrustAll(true); 260 261 // Create the websocket, define its message handler 262 HttpClient client = vertx.createHttpClient(options); 263 client.websocket(WEBSOCKET_URI, websocket -> { 264 265 _websocket = websocket; 266 267 System.out.println("Web socket connected"); 268 269 websocket.frameHandler(frame -> { 270 String data = frame.textData(); 271 JsonObject json = new JsonObject(data); 272 //JsonObject message = json.getJsonObject("message"); 273 _handleMessage(json); // websocket updated 2017-07-19 274 }); 275 276 websocket.closeHandler(x -> { 277 System.out.println("Web socket closed"); 278 }); 279 280 websocket.writeFinalTextFrame("start"); 281 }); 282 283 284 // Get the set observables and format them into a string array 285 observables.update(); 286 287 StringToken observablesToken = (StringToken) observables.getToken(); 288 String observablesValue = (observablesToken == null ? 289 "" : observablesToken.stringValue().trim()); 290 291 if(observablesValue.isEmpty()) { 292 throw new IllegalActionException(this, "Must specify observables."); 293 } 294 295 _observables = observablesValue.split("\\s*,\\s*"); 296 297 List<StructField> fields = new ArrayList<>(); 298 StructField field; 299 // add 7 observables as first 7 columns 300 for(String observable : _observables) { 301 field = DataTypes.createStructField(observable, 302 DataTypes.DoubleType, true); 303 fields.add(field); 304 } 305 306 // add name and timestampMillis columns 307 field = DataTypes.createStructField("name",DataTypes.StringType, true); 308 fields.add(field); 309 field = DataTypes.createStructField("timestampMillis", 310 DataTypes.LongType, true); 311 fields.add(field); 312 313 // create schema 314 _schema = DataTypes.createStructType(fields); 315 316 // create message queue 317 _queue = new ConcurrentLinkedQueue<Message>(); 318 319 // initialize time counter and dropCorrupted 320 timeOfLastFire = System.currentTimeMillis(); 321 322 // get dropCorrupted 323 dropCorrupted = Boolean.parseBoolean(dropCorruptedData.getValueAsString()); 324 325 } 326 327 /** 328 * Waits for weather station data then packages it into a Spark data frame. 329 * 330 * @throws IllegalActionException 331 */ 332 @Override 333 public void fire() 334 throws IllegalActionException 335 { 336 super.fire(); 337 338 synchronized(_queue) { 339 List<Row> rows = new ArrayList<>(); 340 341 while(_queue.size() > 0) { 342 // get message and extract name 343 Message msg = _queue.poll(); 344 String name = msg.name; 345 name = name.replaceAll("\\s+",""); // clean up name string 346 Double[] vector = msg.getVector(_observables); 347 348 // TODO: find a more dynamic way of creating new row with 349 // different data types as its values(double,...,double,String) 350 //Row row = new GenericRow(vector); 351 Row row = RowFactory.create(vector[0], vector[1], vector[2], 352 vector[3], vector[4], vector[5], 353 vector[6], name, 354 msg.timestampMillis); 355 rows.add(row); 356 } 357 358 Dataset<Row> df = _sqlContext.createDataFrame(rows, _schema); 359 360 output.broadcast(new ObjectToken(df, Dataset.class)); 361 362 if (count.numberOfSinks() > 0) 363 count.broadcast(new LongToken(df.count())); 364 } 365 366 } 367 368 /** 369 * Close the websocket connection when the workflow ends. 370 * 371 * @throws IllegalActionException 372 */ 373 @Override 374 public void wrapup() 375 throws IllegalActionException 376 { 377 super.wrapup(); 378 379 if(_websocket != null) { 380 _websocket.close(); 381 } 382 } 383 384 385 /** 386 * Converts the websocket message JSON into a message instance, if it's the 387 * correct message type. 388 * @param message 389 */ 390 private void _handleMessage(JsonObject message) 391 { 392 String mType = "0" + _messageTypeValue; // TODO What is this "0"? 393 394 if(mType.equals(message.getString("mtype"))) { 395 //System.out.println("Accepted message: " + message.toString()); 396 397 Message currentMessage = new Message(message); 398 399 try { 400 // attempt to trigger an exception 401 Double[] vector = currentMessage.getVector(_observables); 402 403 _queue.add(currentMessage); 404 405 // After waiting for the specified interval, 406 // if msg queue not empty, notify DE director to fire actor 407 if (System.currentTimeMillis() - timeOfLastFire > 408 ((LongToken)minFiringInterval.getToken()).longValue() 409 && _queue.size() >= 410 ((IntToken)firingQueueThreshold.getToken()).intValue() ){ 411 412 // reset timestamp variable 413 timeOfLastFire = System.currentTimeMillis(); 414 415 // attempt to signal DE director to call fire() 416 try { 417 //System.out.println("Firing actor"); 418 _fireActor(); 419 } catch (IllegalActionException e) { 420 System.out.println("Error firing GetLiveMeasurements." + e); 421 } 422 } 423 424 } catch (Exception e) { 425 System.out.println("Invalid observable. " + e); 426 } 427 } 428 } 429 430 431 /** 432 * Class for storing web socket responses 433 */ 434 private class Message 435 { 436 public String name = ""; 437 public Long timestampMillis = null; 438 public Double windSpeedMin = null; 439 public Double windSpeedAvg = null; 440 public Double windSpeedMax = null; 441 public Double windDirectionMin = null; 442 public Double windDirectionAvg = null; 443 public Double windDirectionMax = null; 444 public Double airPressure = null; 445 public Double airTemperature = null; 446 public Double internalTemperature = null; 447 public Double relativeHumidity = null; 448 public Double rainAccumulation = null; 449 public Double rainDuration = null; 450 public Double rainIntensity = null; 451 public Double rainPeakIntentsity = null; 452 public Double hailAccumulation = null; 453 public Double hailDuration = null; 454 public Double hailIntensity = null; 455 public Double hailPeakIntentsity = null; 456 public Double heatingTemperature = null; 457 public Double heatingVoltage = null; 458 public Double supplyVoltage = null; 459 public Double ref35Voltage = null; 460 461 /** 462 * Empty constructor for simple initialization. 463 */ 464 public Message() { 465 timestampMillis = System.currentTimeMillis(); 466 } 467 468 /** 469 * Initialize with a message response JSON object. 470 * 471 * @param json 472 */ 473 public Message(JsonObject json) 474 { 475 setValuesFromJson(json); 476 timestampMillis = System.currentTimeMillis(); 477 } 478 479 /** 480 * Sets the values from a message response JSON object. 481 * 482 * @param json 483 */ 484 public void setValuesFromJson(JsonObject json) 485 { 486 Iterator it = json.iterator(); 487 488 while(it.hasNext()) { 489 Map.Entry pair = (Map.Entry) it.next(); 490 String key = (String) pair.getKey(); 491 Object value = pair.getValue(); 492 493 if(key.equals("Name")) { 494 name = (String) value; 495 496 } else if(!key.equals("mtype")) { 497 try { 498 setValue(key, value); 499 } catch(Exception e) { 500 System.out.println("WARNING: " + e.toString()); 501 } 502 } 503 } 504 } 505 506 /** 507 * Gets a measurement value. 508 * 509 * @param key 510 * @return 511 * @throws Exception 512 */ 513 public Double getValue(String key) 514 throws IllegalActionException 515 { 516 switch(key) { 517 case "Sn": return windSpeedMin; 518 case "Sm": return windSpeedAvg; 519 case "Sx": return windSpeedMax; 520 case "Dn": return windDirectionMin; 521 case "Dm": return windDirectionAvg; 522 case "Dx": return windDirectionMax; 523 case "Pa": return airPressure; 524 case "Ta": return airTemperature; 525 case "Tp": return internalTemperature; 526 case "Ua": return relativeHumidity; 527 case "Rc": return rainAccumulation; 528 case "Rd": return rainDuration; 529 case "Ri": return rainIntensity; 530 case "Rp": return rainPeakIntentsity; 531 case "Hc": return hailAccumulation; 532 case "Hd": return hailDuration; 533 case "Hi": return hailIntensity; 534 case "Hp": return hailPeakIntentsity; 535 case "Th": return heatingTemperature; 536 case "Vh": return heatingVoltage; 537 case "Vs": return supplyVoltage; 538 case "Vr": return ref35Voltage; 539 } 540 541 throw new IllegalActionException("\"" + key + 542 "\" is not a valid property."); 543 } 544 545 /** 546 * Sets a measurement value. 547 * Value can be a number, or a string. If it's a string, it must match the 548 * type of a number followed by a unit. 549 * @param key 550 * @param value 551 * @throws Exception 552 */ 553 public void setValue(String key, Object value) 554 throws Exception 555 { 556 Double doubleValue = null; 557 558 if(value instanceof Number) { 559 doubleValue = (Double) value; 560 561 } else if(value instanceof String) { 562 String stringValue = (String) value; 563 Pattern pattern = Pattern.compile("([0-9\\-.]+)(.+)"); 564 Matcher matcher = pattern.matcher(stringValue); 565 566 // catch numeric values containing "#" (corrupted data) 567 if (dropCorrupted && stringValue.contains("#")) 568 throw new Exception("Corrupted data for " + name 569 + " - " + key + ":" + stringValue); 570 571 if(!matcher.matches()) { 572 throw new Exception("\"" + stringValue + "\" is not a valid value."); 573 } 574 575 String number = matcher.group(1); 576 String unit = matcher.group(2); 577 578 //doubleValue = _normalizeUnit(key, Double.parseDouble(number), unit); 579 doubleValue = Double.parseDouble(number); 580 581 } else if(value != null) { 582 throw new Exception("Value passed was of an invalid type."); 583 } 584 585 switch(key) { 586 case "Sn": windSpeedMin = doubleValue; break; 587 case "Sm": windSpeedAvg = doubleValue; break; 588 case "Sx": windSpeedMax = doubleValue; break; 589 case "Dn": windDirectionAvg = doubleValue; break; 590 case "Dm": windDirectionAvg = doubleValue; break; 591 case "Dx": windDirectionMax = doubleValue; break; 592 case "Pa": airPressure = doubleValue; break; 593 case "Ta": airTemperature = doubleValue; break; 594 case "Tp": internalTemperature = doubleValue; break; 595 case "Ua": relativeHumidity = doubleValue; break; 596 case "Rc": rainAccumulation = doubleValue; break; 597 case "Rd": rainDuration = doubleValue; break; 598 case "Ri": rainIntensity = doubleValue; break; 599 case "Rp": rainPeakIntentsity = doubleValue; break; 600 case "Hc": hailAccumulation = doubleValue; break; 601 case "Hd": hailDuration = doubleValue; break; 602 case "Hi": hailIntensity = doubleValue; break; 603 case "Hp": hailPeakIntentsity = doubleValue; break; 604 case "Th": heatingTemperature = doubleValue; break; 605 case "Vh": heatingVoltage = doubleValue; break; 606 case "Vs": supplyVoltage = doubleValue; break; 607 case "Vr": ref35Voltage = doubleValue; break; 608 default: 609 throw new Exception("\"" + key + "\" is not a valid property."); 610 611 } 612 } 613 614 /** 615 * Gets an array of measurement values from an array of measurement names. 616 * 617 * @param observables 618 * @return 619 * @throws Exception 620 */ 621 public Double[] getVector(String[] observables) 622 throws IllegalActionException 623 { 624 Double[] vector = new Double[observables.length]; 625 626 for(int i = 0; i < observables.length; i++) { 627 vector[i] = getValue(observables[i]); 628 } 629 630 return vector; 631 } 632 633 } 634}