001/* 
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 
007 * '$Revision: 34658 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import java.io.BufferedReader;
033import java.io.FileInputStream;
034import java.io.InputStreamReader;
035import java.io.PrintWriter;
036import java.util.Set;
037
038import org.kepler.spark.actor.SparkBaseActor;
039
040import io.vertx.core.json.JsonArray;
041import io.vertx.core.json.JsonObject;
042import ptolemy.actor.TypedIOPort;
043import ptolemy.actor.parameters.PortParameter;
044import ptolemy.data.DoubleToken;
045import ptolemy.data.StringToken;
046import ptolemy.data.expr.StringParameter;
047import ptolemy.data.type.BaseType;
048import ptolemy.kernel.CompositeEntity;
049import ptolemy.kernel.util.IllegalActionException;
050import ptolemy.kernel.util.NameDuplicationException;
051import ptolemy.kernel.util.SingletonAttribute;
052
053/**
054 * @author Jiaxin Li
055 *
056 * Identifies the Santa Ana cluster from the input array of cluster centers. 
057 * Requires a JSON array string as input. Writes results to a user-specified 
058 * JSON file.
059 */
060public class SantaAnaIdentify extends SparkBaseActor {
061
062    public SantaAnaIdentify(CompositeEntity container, String name)
063        throws IllegalActionException, NameDuplicationException {
064        super(container, name);
065
066        inData = new TypedIOPort(this, "inData", true, false);
067        inData.setTypeEquals(BaseType.STRING);
068        new SingletonAttribute(inData, "_showName");
069
070        outFilepath = new StringParameter(this, "outFilepath");
071        outFilepath.setToken("saClusters.json");
072
073        thrsWindSpeed = new PortParameter(this, "thrsWindSpeed");
074        thrsWindSpeed.setTypeEquals(BaseType.DOUBLE);
075        thrsWindSpeed.getPort().setTypeEquals(BaseType.DOUBLE);
076        new SingletonAttribute(thrsWindSpeed.getPort(), "_showName");
077
078        thrsWindDirection=new PortParameter(this, "thrsWindDirection");
079        thrsWindDirection.setTypeEquals(BaseType.DOUBLE);
080        thrsWindDirection.getPort().setTypeEquals(BaseType.DOUBLE);
081        new SingletonAttribute(thrsWindDirection.getPort(), "_showName");
082
083        thrsRelHum = new PortParameter(this, "thrsRelHum");
084        thrsRelHum.setTypeEquals(BaseType.DOUBLE);
085        thrsRelHum.getPort().setTypeEquals(BaseType.DOUBLE);
086        new SingletonAttribute(thrsRelHum.getPort(), "_showName");
087
088    }
089
090    /* 
091     * Initialize:
092     * sets up the top-level JsonObject
093     */
094    @Override
095    public void initialize() throws IllegalActionException {
096
097        super.initialize();
098
099        // initialize JsonObject to exist throughout actor's lifetime
100        _fileObj = new JsonObject();
101    }
102
103    /*
104     * fire:
105     * process incoming dataframe
106     */
107    @Override
108    public void fire() throws IllegalActionException {
109
110        super.fire();
111
112        /* Get entryId; read cluster center data as Double[][] */
113        String inString =
114            (String)((StringToken)inData.get(0)).stringValue();
115        JsonObject inObj = new JsonObject(inString);
116        String entryId = inObj.getString("name");
117        JsonArray inputArray = inObj.getJsonArray("clusterCenters");
118
119        int numClusters = inputArray.size();
120        int clusterDim;
121        try {
122            clusterDim = inputArray.getJsonArray(0).size();
123        } catch(NullPointerException e) {
124            System.err.println("ERROR: empty cluster center input!");
125            throw e;  // passing on NPE, only adding an error message  
126        }
127
128        _clusterCenters = new Double[numClusters][clusterDim];
129        for(int i = 0; i < numClusters; i++)
130            for(int j = 0; j < clusterDim; j++)
131                _clusterCenters[i][j]=inputArray.getJsonArray(i).getDouble(j);
132
133        // DEBUG output
134        if(_debugging) {
135            System.out.println("SantaAnaIdentify: " + entryId);
136            //System.out.println("SantaAnaIdentify: " +
137            //                   inputArray.encodePrettily());
138        }
139
140        
141        // init cluster center index to -1 (no SA cluster)
142        int index = -1;
143        /* Find SA cluster center */
144        double tWindSpeed =
145            (double)((DoubleToken)thrsWindSpeed.getToken()).doubleValue();
146        double tWindDir =
147            (double)((DoubleToken)thrsWindDirection.getToken()).doubleValue();
148        double tRelHum =
149            (double)((DoubleToken)thrsRelHum.getToken()).doubleValue();
150        for(int i = 0; i < numClusters; i++) {
151            // using thresholds on three values
152            // NOTE: wind direction should be in a range (from east to west)?
153            if(_clusterCenters[i][IDX_AVG_WIND_SPEED] > tWindSpeed &&
154               _clusterCenters[i][IDX_AVG_WIND_DIRECTION] < tWindDir &&
155               _clusterCenters[i][IDX_RELATIVE_HUMIDITY] < tRelHum) {
156
157                //DEBUG
158                if(_debugging)
159                    System.out.printf("SantaAnaIdentify.fire: %d, %f, %f, %f\n", i,
160                                      _clusterCenters[i][IDX_AVG_WIND_SPEED],
161                                      _clusterCenters[i][IDX_AVG_WIND_DIRECTION],
162                                      _clusterCenters[i][IDX_RELATIVE_HUMIDITY]);
163                
164                // catch case where more than one center fits criteria
165                // compararison: using a linear-scale scoring system.
166                //     If the current cluster has more extreme values than the
167                //     previouscluster, use the current as SA cluster
168                if(index != -1) {
169                    double dWindSpeed =
170                        _clusterCenters[i][IDX_AVG_WIND_SPEED]
171                        - _clusterCenters[index][IDX_AVG_WIND_SPEED];
172                    double dWindDirection =
173                        _clusterCenters[i][IDX_AVG_WIND_DIRECTION]
174                        - _clusterCenters[index][IDX_AVG_WIND_DIRECTION];
175                    double dRelativeHumidity =
176                        _clusterCenters[i][IDX_RELATIVE_HUMIDITY]
177                        - _clusterCenters[index][IDX_RELATIVE_HUMIDITY];
178                    
179                    if(dWindSpeed+(-dWindDirection)+(-dRelativeHumidity) > 0)
180                        index = i; 
181                }
182                else 
183                    index = i;
184            }
185        }
186
187        
188        /* After identifying the SA cluster, write to _fileObj */
189        JsonObject entryObj = new JsonObject();  // create station object
190        entryObj.put("saCluster", index);
191        _fileObj.put(entryId, entryObj);  // add station object to file object
192
193        //DEBUG
194        if(_debugging)
195            System.out.println("SantaAnaIdentify.fire ends!");
196
197    }
198
199
200    /* 
201     * wrap-up:
202     * writes _fileObj to buffer, then flush and close the file.
203     * NOTE: opens file only after the workflow is complete
204     */
205    @Override
206    public void wrapup() throws IllegalActionException {
207
208        super.wrapup();
209
210        PrintWriter writer;
211
212        // try to read in existing file and overwrite it with new data. If no
213        // original files exist, create a new file.
214        try {
215
216            // read in existing file, is possible
217            FileInputStream is =
218                new FileInputStream(outFilepath.getValueAsString());
219            BufferedReader reader =
220                new BufferedReader(new InputStreamReader(is));
221
222            // read in the entire file as a single string, for JsonObject
223            StringBuilder sb = new StringBuilder();
224            String line = reader.readLine();
225            while (line != null) {
226                sb.append(line);
227                line = reader.readLine();
228            }
229            reader.close(); // close file for writing
230
231
232            // file read in, construct JSON object for contents in orig file
233            JsonObject origFileObj = new JsonObject(sb.toString());
234            // get list of all station names from the new JSON object
235            Set<String> stationList = _fileObj.fieldNames();
236
237            // replace old station data with new data
238            for (String station : stationList) {
239                origFileObj.remove(station);  // ignored if station DNE
240                origFileObj.put(station, _fileObj.getJsonObject(station));
241            }
242
243
244            // write updated origFileObj to file.
245            try {
246                // open file
247                writer = new PrintWriter(outFilepath.getValueAsString());
248                // write _fileObj
249                writer.println(origFileObj.encodePrettily()); // updated data
250                // flush and close the writer's file
251                writer.flush();
252                writer.close();
253            }catch(Exception ei1){System.err.println("Failed to open file!");}
254
255        } catch (Exception fe) {  // if error, create/write to new file
256
257            System.out.println("No original data file found.");
258
259            try {
260                // open file
261                writer = new PrintWriter(outFilepath.getValueAsString());
262                // write _fileObj
263                writer.println(_fileObj.encodePrettily());
264                // flush and close the writer's file
265                writer.flush();
266                writer.close();
267            }catch(Exception ei2){System.err.println("Failed to open file!");}
268        }
269
270    }
271
272
273    /** Input data as JSON Array (string format). */
274    public TypedIOPort inData;
275
276    /** Parameter to store string for output JSON file **/
277    public StringParameter outFilepath;
278
279    /** Wind speed threshold */
280    public PortParameter thrsWindSpeed;
281
282    /** Wind direction threshold */
283    public PortParameter thrsWindDirection;
284
285    /** Relative humidity threshold */
286    public PortParameter thrsRelHum;
287
288    /* private fields */
289    private JsonObject _fileObj;
290    private Double[][] _clusterCenters;
291
292    /* constants for values indices in measurement vectors */
293    private final int IDX_AIR_PRESSURE = 0;
294    private final int IDX_AIR_TEMP = 1;
295    private final int IDX_AVG_WIND_DIRECTION = 2;
296    private final int IDX_AVG_WIND_SPEED = 3;
297    private final int IDX_MAX_WIND_DIRECTION = 4;
298    private final int IDX_MAX_WIND_SPEED = 5;
299    private final int IDX_RELATIVE_HUMIDITY = 6;
300
301}