001/*
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-07 18:53:35 +0000 (Wed, 07 Feb 2018) $'
007 * '$Revision: 34661 $'
008 *
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Queue;
038import java.util.concurrent.ConcurrentLinkedQueue;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041
042import org.apache.spark.sql.Dataset;
043import org.apache.spark.sql.Row;
044import org.apache.spark.sql.RowFactory;
045import org.apache.spark.sql.types.DataTypes;
046import org.apache.spark.sql.types.StructField;
047import org.apache.spark.sql.types.StructType;
048import org.kepler.spark.actor.SparkSQLActor;
049import org.kepler.webview.server.WebViewServer; // use webview's vertx instance
050
051import io.vertx.core.Vertx;
052import io.vertx.core.http.HttpClient;
053import io.vertx.core.http.HttpClientOptions;
054import io.vertx.core.http.WebSocket;
055import io.vertx.core.json.JsonObject;
056import ptolemy.actor.Actor;
057import ptolemy.actor.TypedIOPort;
058import ptolemy.actor.parameters.PortParameter;
059import ptolemy.data.IntToken;
060import ptolemy.data.LongToken;
061import ptolemy.data.ObjectToken;
062import ptolemy.data.StringToken;
063import ptolemy.data.Token;
064import ptolemy.data.expr.Parameter;
065import ptolemy.data.expr.StringParameter;
066import ptolemy.data.type.BaseType;
067import ptolemy.data.type.ObjectType;
068import ptolemy.kernel.CompositeEntity;
069import ptolemy.kernel.util.Attribute;
070import ptolemy.kernel.util.ChangeRequest;
071import ptolemy.kernel.util.IllegalActionException;
072import ptolemy.kernel.util.NameDuplicationException;
073import ptolemy.kernel.util.NamedObj;
074import ptolemy.kernel.util.SingletonAttribute;
075
076/**
077 * Actor for retrieving weather station data from the Firemap web socket server.
078 */
079public class GetLiveMeasurements extends SparkSQLActor
080{
081    protected Vertx vertx;
082
083    public StringParameter websocketHost;    // firemap.sdsc.edu
084    public StringParameter websocketPort;    // 8443;
085    public StringParameter websocketUri;     // /livewx/websocket
086    public StringParameter messageType;
087    public StringParameter dropCorruptedData;
088    public Parameter minFiringInterval;
089    public Parameter firingQueueThreshold;
090    private static String WEBSOCKET_HOST;
091    private static int WEBSOCKET_PORT;
092    private static String WEBSOCKET_URI;
093
094    private final long DEFAULT_FIRING_INTERVAL = 60000;
095    private final int DEFAULT_QUEUE_THRS = 1;
096
097    public PortParameter observables;
098    public TypedIOPort output;
099    public TypedIOPort count;
100
101    private WebSocket _websocket = null;
102    public String[] _observables = null;
103
104    private String _messageTypeValue;
105    private StructType _schema;
106
107    // A queue that holds data samples yet to be output by the actor.
108    // _handleMessage() enqueues incoming websocket data,
109    // fire() dequeues and outputs a dataset
110    private Queue<Message> _queue;
111
112    // long for storing timestamp of last actor firing
113    // Used for timing GetLiveMeasurement's firing.
114    private long timeOfLastFire;
115
116    // if true, sets any incoming measurement value containing "#" to null.
117    private boolean dropCorrupted;
118
119    /**
120     * Initializes the actor's parameters and Vertex property.
121     *
122     * @param container
123     * @param name
124     * @throws IllegalActionException
125     * @throws NameDuplicationException
126     */
127    public GetLiveMeasurements(CompositeEntity container, String name)
128        throws IllegalActionException, NameDuplicationException
129    {
130        super(container, name);
131
132        messageType = new StringParameter(this, "messageType");
133        messageType.setExpression("R0");
134        messageType.addChoice("R0");
135        messageType.addChoice("R1");
136        messageType.addChoice("R2");
137        messageType.addChoice("R3");
138        messageType.addChoice("R5");
139
140        observables = new PortParameter(this, "observables");
141        observables.setStringMode(true);
142        observables.setTypeEquals(BaseType.STRING);
143        observables.getPort().setTypeEquals(BaseType.STRING);
144        new Attribute(observables.getPort(), "_showName");
145
146        websocketHost = new StringParameter(this, "websocketHost");
147        websocketHost.setExpression("firemap.sdsc.edu");
148
149        websocketPort = new StringParameter(this, "websocketPort");
150        websocketPort.setExpression("5446");
151
152        websocketUri = new StringParameter(this, "websocketUri");
153        websocketUri.setExpression("");
154
155        dropCorruptedData = new StringParameter(this, "dropCorruptedData");
156        dropCorruptedData.setExpression("true");
157
158        minFiringInterval = new Parameter(this, "minFiringInterval");
159        minFiringInterval.setTypeEquals(BaseType.LONG);
160        minFiringInterval.setToken(new LongToken(DEFAULT_FIRING_INTERVAL));
161
162        firingQueueThreshold = new Parameter(this, "firingQueueThreshold");
163        firingQueueThreshold.setTypeEquals(BaseType.INT);
164        firingQueueThreshold.setToken(new IntToken(DEFAULT_QUEUE_THRS));
165
166        output = new TypedIOPort(this, "output", false, true);
167        output.setTypeEquals(new ObjectType(Dataset.class));
168        new SingletonAttribute(output, "_showName");
169
170        count = new TypedIOPort(this, "count", false, true);
171        count.setTypeEquals(BaseType.LONG);
172        new SingletonAttribute(count, "_showName");
173    }
174
175    /**
176     * Handles changing the message type parameter, and validates it.
177     *
178     * @param attribute The attribute that changed.
179     * @throws IllegalActionException
180     */
181    @Override
182    public void attributeChanged(Attribute attribute)
183        throws IllegalActionException
184    {
185        if(attribute == messageType) {
186            Token token = messageType.getToken();
187
188            if(token != null) {
189                String value = ((StringToken) token).stringValue();
190
191                if(value != null && !value.trim().isEmpty()) {
192                    String[] choices = messageType.getChoices();
193
194                    if(!Arrays.asList(choices).contains(value))  {
195                        throw new IllegalActionException(this,
196                                         "Unsupported messageType: " + value);
197                    }
198
199                    _messageTypeValue = value;
200                }
201            }
202        }
203        else {
204            super.attributeChanged(attribute);
205        }
206    }
207
208
209    /**
210     * Call this to tell DE director to fire the actor.
211     *
212     * @throws IllegalActionException
213     */
214    private void _fireActor() throws IllegalActionException
215    {
216        final Actor actor = this;
217        final NamedObj namedObj = output;
218        ChangeRequest request = new ChangeRequest(this, "fire " + getName())
219            {
220                protected void _execute() throws Exception {
221                    getDirector().fireAtCurrentTime(actor);
222                }
223
224                @Override
225                public NamedObj getLocality() {
226                    return namedObj;
227                }
228
229            };
230        request.setPersistent(false);
231        getContainer().requestChange(request);
232    }
233
234
235    /**
236     * Sets up the websocket and other member variables once per workflow
237     * execution.
238     *
239     * @throws IllegalActionException
240     */
241    @Override
242    public void initialize()
243        throws IllegalActionException
244    {
245        super.initialize();
246
247        // Get the websocket parameters, store them in member variables
248        WEBSOCKET_HOST = websocketHost.getValueAsString();
249        WEBSOCKET_PORT = Integer.parseInt(websocketPort.getValueAsString());
250        WEBSOCKET_URI = websocketUri.getValueAsString();
251
252        // Set options for websocket
253        //vertx = Vertx.vertx();
254        vertx = WebViewServer.vertx();
255        HttpClientOptions options = new HttpClientOptions();
256        options.setDefaultHost(WEBSOCKET_HOST);
257        options.setDefaultPort(WEBSOCKET_PORT);
258        options.setSsl(true);
259        options.setTrustAll(true);
260
261        // Create the websocket, define its message handler
262        HttpClient client = vertx.createHttpClient(options);
263        client.websocket(WEBSOCKET_URI, websocket -> {
264
265                _websocket = websocket;
266
267                System.out.println("Web socket connected");
268
269                websocket.frameHandler(frame -> {
270                            String data = frame.textData();
271                            JsonObject json = new JsonObject(data);
272                            //JsonObject message = json.getJsonObject("message");
273                            _handleMessage(json);  // websocket updated 2017-07-19
274                        });
275
276                websocket.closeHandler(x -> {
277                        System.out.println("Web socket closed");
278                    });
279
280                websocket.writeFinalTextFrame("start");
281                });
282
283
284        // Get the set observables and format them into a string array
285        observables.update();
286
287        StringToken observablesToken = (StringToken) observables.getToken();
288        String observablesValue = (observablesToken == null ?
289                                   "" : observablesToken.stringValue().trim());
290
291        if(observablesValue.isEmpty()) {
292            throw new IllegalActionException(this, "Must specify observables.");
293        }
294
295        _observables = observablesValue.split("\\s*,\\s*");
296
297        List<StructField> fields = new ArrayList<>();
298        StructField field;
299        // add 7 observables as first 7 columns
300        for(String observable : _observables) {
301            field = DataTypes.createStructField(observable,
302                                                DataTypes.DoubleType, true);
303            fields.add(field);
304        }
305
306        // add name and timestampMillis columns
307        field = DataTypes.createStructField("name",DataTypes.StringType, true);
308        fields.add(field);
309        field = DataTypes.createStructField("timestampMillis",
310                                            DataTypes.LongType, true);
311        fields.add(field);
312
313        // create schema
314        _schema = DataTypes.createStructType(fields);
315
316        // create message queue
317        _queue = new ConcurrentLinkedQueue<Message>();
318
319        // initialize time counter and dropCorrupted
320        timeOfLastFire = System.currentTimeMillis();
321
322        // get dropCorrupted
323        dropCorrupted = Boolean.parseBoolean(dropCorruptedData.getValueAsString());
324
325    }
326
327    /**
328     * Waits for weather station data then packages it into a Spark data frame.
329     *
330     * @throws IllegalActionException
331     */
332    @Override
333    public void fire()
334        throws IllegalActionException
335    {
336        super.fire();
337
338        synchronized(_queue) {
339            List<Row> rows = new ArrayList<>();
340
341            while(_queue.size() > 0) {
342                // get message and extract name
343                Message msg  = _queue.poll();
344                String name = msg.name;
345                name = name.replaceAll("\\s+","");  // clean up name string
346                Double[] vector = msg.getVector(_observables);
347
348                // TODO: find a more dynamic way of creating new row with
349                // different data types as its values(double,...,double,String)
350                //Row row = new GenericRow(vector);
351                Row row = RowFactory.create(vector[0], vector[1], vector[2],
352                                            vector[3], vector[4], vector[5],
353                                            vector[6], name,
354                                            msg.timestampMillis);
355                rows.add(row);
356            }
357
358            Dataset<Row> df = _sqlContext.createDataFrame(rows, _schema);
359
360            output.broadcast(new ObjectToken(df, Dataset.class));
361
362            if (count.numberOfSinks() > 0)
363                count.broadcast(new LongToken(df.count()));
364        }
365
366    }
367
368    /**
369     * Close the websocket connection when the workflow ends.
370     *
371     * @throws IllegalActionException
372     */
373    @Override
374    public void wrapup()
375        throws IllegalActionException
376    {
377        super.wrapup();
378
379        if(_websocket != null)  {
380            _websocket.close();
381        }
382    }
383
384
385    /**
386     * Converts the websocket message JSON into a message instance, if it's the
387     * correct message type.
388     * @param message
389     */
390    private void _handleMessage(JsonObject message)
391    {
392        String mType = "0" + _messageTypeValue; // TODO  What is this "0"?
393
394        if(mType.equals(message.getString("mtype")))  {
395            //System.out.println("Accepted message: " + message.toString());
396
397            Message currentMessage = new Message(message);
398
399            try {
400                // attempt to trigger an exception
401                Double[] vector = currentMessage.getVector(_observables);
402
403                _queue.add(currentMessage);
404
405                // After waiting for the specified interval,
406                //   if msg queue not empty, notify DE director to fire actor
407                if (System.currentTimeMillis() - timeOfLastFire >
408                    ((LongToken)minFiringInterval.getToken()).longValue()
409                    && _queue.size() >=
410                    ((IntToken)firingQueueThreshold.getToken()).intValue() ){
411
412                    // reset timestamp variable
413                    timeOfLastFire = System.currentTimeMillis();
414
415                    // attempt to signal DE director to call fire()
416                    try {
417                        //System.out.println("Firing actor");
418                        _fireActor();
419                    } catch (IllegalActionException e) {
420                        System.out.println("Error firing GetLiveMeasurements." + e);
421                    }
422                }
423
424            } catch (Exception e) {
425                System.out.println("Invalid observable. " + e);
426            }
427        }
428    }
429
430
431    /**
432     * Class for storing web socket responses
433     */
434    private class Message
435    {
436        public String name = "";
437        public Long timestampMillis = null;
438        public Double windSpeedMin = null;
439        public Double windSpeedAvg = null;
440        public Double windSpeedMax = null;
441        public Double windDirectionMin = null;
442        public Double windDirectionAvg = null;
443        public Double windDirectionMax = null;
444        public Double airPressure = null;
445        public Double airTemperature = null;
446        public Double internalTemperature = null;
447        public Double relativeHumidity = null;
448        public Double rainAccumulation = null;
449        public Double rainDuration = null;
450        public Double rainIntensity = null;
451        public Double rainPeakIntentsity = null;
452        public Double hailAccumulation = null;
453        public Double hailDuration = null;
454        public Double hailIntensity = null;
455        public Double hailPeakIntentsity = null;
456        public Double heatingTemperature = null;
457        public Double heatingVoltage = null;
458        public Double supplyVoltage = null;
459        public Double ref35Voltage = null;
460
461        /**
462         * Empty constructor for simple initialization.
463         */
464        public Message() {
465            timestampMillis = System.currentTimeMillis();
466        }
467
468        /**
469         * Initialize with a message response JSON object.
470         *
471         * @param json
472         */
473        public Message(JsonObject json)
474        {
475            setValuesFromJson(json);
476            timestampMillis = System.currentTimeMillis();
477        }
478
479        /**
480         * Sets the values from a message response JSON object.
481         *
482         * @param json
483         */
484        public void setValuesFromJson(JsonObject json)
485        {
486            Iterator it = json.iterator();
487
488            while(it.hasNext()) {
489                Map.Entry pair = (Map.Entry) it.next();
490                String key = (String) pair.getKey();
491                Object value = pair.getValue();
492
493                if(key.equals("Name")) {
494                    name = (String) value;
495
496                } else if(!key.equals("mtype")) {
497                    try {
498                        setValue(key, value);
499                    } catch(Exception e) {
500                        System.out.println("WARNING: " + e.toString());
501                    }
502                }
503            }
504        }
505
506        /**
507         * Gets a measurement value.
508         *
509         * @param key
510         * @return
511         * @throws Exception
512         */
513        public Double getValue(String key)
514            throws IllegalActionException
515        {
516            switch(key) {
517            case "Sn": return windSpeedMin;
518            case "Sm": return windSpeedAvg;
519            case "Sx": return windSpeedMax;
520            case "Dn": return windDirectionMin;
521            case "Dm": return windDirectionAvg;
522            case "Dx": return windDirectionMax;
523            case "Pa": return airPressure;
524            case "Ta": return airTemperature;
525            case "Tp": return internalTemperature;
526            case "Ua": return relativeHumidity;
527            case "Rc": return rainAccumulation;
528            case "Rd": return rainDuration;
529            case "Ri": return rainIntensity;
530            case "Rp": return rainPeakIntentsity;
531            case "Hc": return hailAccumulation;
532            case "Hd": return hailDuration;
533            case "Hi": return hailIntensity;
534            case "Hp": return hailPeakIntentsity;
535            case "Th": return heatingTemperature;
536            case "Vh": return heatingVoltage;
537            case "Vs": return supplyVoltage;
538            case "Vr": return ref35Voltage;
539            }
540
541            throw new IllegalActionException("\"" + key +
542                                             "\" is not a valid property.");
543        }
544
545        /**
546         * Sets a measurement value.
547         * Value can be a number, or a string. If it's a string, it must match the
548         * type of a number followed by a unit.
549         * @param key
550         * @param value
551         * @throws Exception
552         */
553        public void setValue(String key, Object value)
554            throws Exception
555        {
556            Double doubleValue = null;
557
558            if(value instanceof Number) {
559                doubleValue = (Double) value;
560
561            } else if(value instanceof String) {
562                String stringValue = (String) value;
563                Pattern pattern = Pattern.compile("([0-9\\-.]+)(.+)");
564                Matcher matcher = pattern.matcher(stringValue);
565
566                // catch numeric values containing "#" (corrupted data)
567                if (dropCorrupted && stringValue.contains("#"))
568                    throw new Exception("Corrupted data for " + name
569                                        + " - " + key + ":" + stringValue);
570
571                if(!matcher.matches()) {
572                    throw new Exception("\"" + stringValue + "\" is not a valid value.");
573                }
574
575                String number = matcher.group(1);
576                String unit = matcher.group(2);
577
578                //doubleValue = _normalizeUnit(key, Double.parseDouble(number), unit);
579                doubleValue = Double.parseDouble(number);
580
581            } else if(value != null) {
582                throw new Exception("Value passed was of an invalid type.");
583            }
584
585            switch(key) {
586            case "Sn": windSpeedMin = doubleValue; break;
587            case "Sm": windSpeedAvg = doubleValue; break;
588            case "Sx": windSpeedMax = doubleValue; break;
589            case "Dn": windDirectionAvg = doubleValue; break;
590            case "Dm": windDirectionAvg = doubleValue; break;
591            case "Dx": windDirectionMax = doubleValue; break;
592            case "Pa": airPressure = doubleValue; break;
593            case "Ta": airTemperature = doubleValue; break;
594            case "Tp": internalTemperature = doubleValue; break;
595            case "Ua": relativeHumidity = doubleValue; break;
596            case "Rc": rainAccumulation = doubleValue; break;
597            case "Rd": rainDuration = doubleValue; break;
598            case "Ri": rainIntensity = doubleValue; break;
599            case "Rp": rainPeakIntentsity = doubleValue; break;
600            case "Hc": hailAccumulation = doubleValue; break;
601            case "Hd": hailDuration = doubleValue; break;
602            case "Hi": hailIntensity = doubleValue; break;
603            case "Hp": hailPeakIntentsity = doubleValue; break;
604            case "Th": heatingTemperature = doubleValue; break;
605            case "Vh": heatingVoltage = doubleValue; break;
606            case "Vs": supplyVoltage = doubleValue; break;
607            case "Vr": ref35Voltage = doubleValue; break;
608            default:
609                throw new Exception("\"" + key + "\" is not a valid property.");
610
611            }
612        }
613
614        /**
615         * Gets an array of measurement values from an array of measurement names.
616         *
617         * @param observables
618         * @return
619         * @throws Exception
620         */
621        public Double[] getVector(String[] observables)
622            throws IllegalActionException
623        {
624            Double[] vector = new Double[observables.length];
625
626            for(int i = 0; i < observables.length; i++) {
627                vector[i] = getValue(observables[i]);
628            }
629
630            return vector;
631        }
632
633    }
634}