001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34658 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import java.io.BufferedReader; 033import java.io.FileInputStream; 034import java.io.InputStreamReader; 035import java.io.PrintWriter; 036import java.util.Set; 037 038import org.kepler.spark.actor.SparkBaseActor; 039 040import io.vertx.core.json.JsonArray; 041import io.vertx.core.json.JsonObject; 042import ptolemy.actor.TypedIOPort; 043import ptolemy.actor.parameters.PortParameter; 044import ptolemy.data.DoubleToken; 045import ptolemy.data.StringToken; 046import ptolemy.data.expr.StringParameter; 047import ptolemy.data.type.BaseType; 048import ptolemy.kernel.CompositeEntity; 049import ptolemy.kernel.util.IllegalActionException; 050import ptolemy.kernel.util.NameDuplicationException; 051import ptolemy.kernel.util.SingletonAttribute; 052 053/** 054 * @author Jiaxin Li 055 * 056 * Identifies the Santa Ana cluster from the input array of cluster centers. 057 * Requires a JSON array string as input. Writes results to a user-specified 058 * JSON file. 059 */ 060public class SantaAnaIdentify extends SparkBaseActor { 061 062 public SantaAnaIdentify(CompositeEntity container, String name) 063 throws IllegalActionException, NameDuplicationException { 064 super(container, name); 065 066 inData = new TypedIOPort(this, "inData", true, false); 067 inData.setTypeEquals(BaseType.STRING); 068 new SingletonAttribute(inData, "_showName"); 069 070 outFilepath = new StringParameter(this, "outFilepath"); 071 outFilepath.setToken("saClusters.json"); 072 073 thrsWindSpeed = new PortParameter(this, "thrsWindSpeed"); 074 thrsWindSpeed.setTypeEquals(BaseType.DOUBLE); 075 thrsWindSpeed.getPort().setTypeEquals(BaseType.DOUBLE); 076 new SingletonAttribute(thrsWindSpeed.getPort(), "_showName"); 077 078 thrsWindDirection=new PortParameter(this, "thrsWindDirection"); 079 thrsWindDirection.setTypeEquals(BaseType.DOUBLE); 080 thrsWindDirection.getPort().setTypeEquals(BaseType.DOUBLE); 081 new SingletonAttribute(thrsWindDirection.getPort(), "_showName"); 082 083 thrsRelHum = new PortParameter(this, "thrsRelHum"); 084 thrsRelHum.setTypeEquals(BaseType.DOUBLE); 085 thrsRelHum.getPort().setTypeEquals(BaseType.DOUBLE); 086 new SingletonAttribute(thrsRelHum.getPort(), "_showName"); 087 088 } 089 090 /* 091 * Initialize: 092 * sets up the top-level JsonObject 093 */ 094 @Override 095 public void initialize() throws IllegalActionException { 096 097 super.initialize(); 098 099 // initialize JsonObject to exist throughout actor's lifetime 100 _fileObj = new JsonObject(); 101 } 102 103 /* 104 * fire: 105 * process incoming dataframe 106 */ 107 @Override 108 public void fire() throws IllegalActionException { 109 110 super.fire(); 111 112 /* Get entryId; read cluster center data as Double[][] */ 113 String inString = 114 (String)((StringToken)inData.get(0)).stringValue(); 115 JsonObject inObj = new JsonObject(inString); 116 String entryId = inObj.getString("name"); 117 JsonArray inputArray = inObj.getJsonArray("clusterCenters"); 118 119 int numClusters = inputArray.size(); 120 int clusterDim; 121 try { 122 clusterDim = inputArray.getJsonArray(0).size(); 123 } catch(NullPointerException e) { 124 System.err.println("ERROR: empty cluster center input!"); 125 throw e; // passing on NPE, only adding an error message 126 } 127 128 _clusterCenters = new Double[numClusters][clusterDim]; 129 for(int i = 0; i < numClusters; i++) 130 for(int j = 0; j < clusterDim; j++) 131 _clusterCenters[i][j]=inputArray.getJsonArray(i).getDouble(j); 132 133 // DEBUG output 134 if(_debugging) { 135 System.out.println("SantaAnaIdentify: " + entryId); 136 //System.out.println("SantaAnaIdentify: " + 137 // inputArray.encodePrettily()); 138 } 139 140 141 // init cluster center index to -1 (no SA cluster) 142 int index = -1; 143 /* Find SA cluster center */ 144 double tWindSpeed = 145 (double)((DoubleToken)thrsWindSpeed.getToken()).doubleValue(); 146 double tWindDir = 147 (double)((DoubleToken)thrsWindDirection.getToken()).doubleValue(); 148 double tRelHum = 149 (double)((DoubleToken)thrsRelHum.getToken()).doubleValue(); 150 for(int i = 0; i < numClusters; i++) { 151 // using thresholds on three values 152 // NOTE: wind direction should be in a range (from east to west)? 153 if(_clusterCenters[i][IDX_AVG_WIND_SPEED] > tWindSpeed && 154 _clusterCenters[i][IDX_AVG_WIND_DIRECTION] < tWindDir && 155 _clusterCenters[i][IDX_RELATIVE_HUMIDITY] < tRelHum) { 156 157 //DEBUG 158 if(_debugging) 159 System.out.printf("SantaAnaIdentify.fire: %d, %f, %f, %f\n", i, 160 _clusterCenters[i][IDX_AVG_WIND_SPEED], 161 _clusterCenters[i][IDX_AVG_WIND_DIRECTION], 162 _clusterCenters[i][IDX_RELATIVE_HUMIDITY]); 163 164 // catch case where more than one center fits criteria 165 // compararison: using a linear-scale scoring system. 166 // If the current cluster has more extreme values than the 167 // previouscluster, use the current as SA cluster 168 if(index != -1) { 169 double dWindSpeed = 170 _clusterCenters[i][IDX_AVG_WIND_SPEED] 171 - _clusterCenters[index][IDX_AVG_WIND_SPEED]; 172 double dWindDirection = 173 _clusterCenters[i][IDX_AVG_WIND_DIRECTION] 174 - _clusterCenters[index][IDX_AVG_WIND_DIRECTION]; 175 double dRelativeHumidity = 176 _clusterCenters[i][IDX_RELATIVE_HUMIDITY] 177 - _clusterCenters[index][IDX_RELATIVE_HUMIDITY]; 178 179 if(dWindSpeed+(-dWindDirection)+(-dRelativeHumidity) > 0) 180 index = i; 181 } 182 else 183 index = i; 184 } 185 } 186 187 188 /* After identifying the SA cluster, write to _fileObj */ 189 JsonObject entryObj = new JsonObject(); // create station object 190 entryObj.put("saCluster", index); 191 _fileObj.put(entryId, entryObj); // add station object to file object 192 193 //DEBUG 194 if(_debugging) 195 System.out.println("SantaAnaIdentify.fire ends!"); 196 197 } 198 199 200 /* 201 * wrap-up: 202 * writes _fileObj to buffer, then flush and close the file. 203 * NOTE: opens file only after the workflow is complete 204 */ 205 @Override 206 public void wrapup() throws IllegalActionException { 207 208 super.wrapup(); 209 210 PrintWriter writer; 211 212 // try to read in existing file and overwrite it with new data. If no 213 // original files exist, create a new file. 214 try { 215 216 // read in existing file, is possible 217 FileInputStream is = 218 new FileInputStream(outFilepath.getValueAsString()); 219 BufferedReader reader = 220 new BufferedReader(new InputStreamReader(is)); 221 222 // read in the entire file as a single string, for JsonObject 223 StringBuilder sb = new StringBuilder(); 224 String line = reader.readLine(); 225 while (line != null) { 226 sb.append(line); 227 line = reader.readLine(); 228 } 229 reader.close(); // close file for writing 230 231 232 // file read in, construct JSON object for contents in orig file 233 JsonObject origFileObj = new JsonObject(sb.toString()); 234 // get list of all station names from the new JSON object 235 Set<String> stationList = _fileObj.fieldNames(); 236 237 // replace old station data with new data 238 for (String station : stationList) { 239 origFileObj.remove(station); // ignored if station DNE 240 origFileObj.put(station, _fileObj.getJsonObject(station)); 241 } 242 243 244 // write updated origFileObj to file. 245 try { 246 // open file 247 writer = new PrintWriter(outFilepath.getValueAsString()); 248 // write _fileObj 249 writer.println(origFileObj.encodePrettily()); // updated data 250 // flush and close the writer's file 251 writer.flush(); 252 writer.close(); 253 }catch(Exception ei1){System.err.println("Failed to open file!");} 254 255 } catch (Exception fe) { // if error, create/write to new file 256 257 System.out.println("No original data file found."); 258 259 try { 260 // open file 261 writer = new PrintWriter(outFilepath.getValueAsString()); 262 // write _fileObj 263 writer.println(_fileObj.encodePrettily()); 264 // flush and close the writer's file 265 writer.flush(); 266 writer.close(); 267 }catch(Exception ei2){System.err.println("Failed to open file!");} 268 } 269 270 } 271 272 273 /** Input data as JSON Array (string format). */ 274 public TypedIOPort inData; 275 276 /** Parameter to store string for output JSON file **/ 277 public StringParameter outFilepath; 278 279 /** Wind speed threshold */ 280 public PortParameter thrsWindSpeed; 281 282 /** Wind direction threshold */ 283 public PortParameter thrsWindDirection; 284 285 /** Relative humidity threshold */ 286 public PortParameter thrsRelHum; 287 288 /* private fields */ 289 private JsonObject _fileObj; 290 private Double[][] _clusterCenters; 291 292 /* constants for values indices in measurement vectors */ 293 private final int IDX_AIR_PRESSURE = 0; 294 private final int IDX_AIR_TEMP = 1; 295 private final int IDX_AVG_WIND_DIRECTION = 2; 296 private final int IDX_AVG_WIND_SPEED = 3; 297 private final int IDX_MAX_WIND_DIRECTION = 4; 298 private final int IDX_MAX_WIND_SPEED = 5; 299 private final int IDX_RELATIVE_HUMIDITY = 6; 300 301}