001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-06 19:09:58 +0000 (Tue, 06 Feb 2018) $' 007 * '$Revision: 34656 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import java.util.HashMap; 033import java.util.Map; 034 035import org.apache.spark.sql.Dataset; 036import org.apache.spark.sql.Row; 037import org.apache.spark.sql.types.StructType; 038import org.kepler.spark.actor.SparkSQLActor; 039 040import ptolemy.actor.TypedIOPort; 041import ptolemy.data.BooleanToken; 042import ptolemy.data.LongToken; 043import ptolemy.data.ObjectToken; 044import ptolemy.data.StringToken; 045import ptolemy.data.expr.Parameter; 046import ptolemy.data.expr.StringParameter; 047import ptolemy.data.type.BaseType; 048import ptolemy.data.type.ObjectType; 049import ptolemy.kernel.CompositeEntity; 050import ptolemy.kernel.util.IllegalActionException; 051import ptolemy.kernel.util.NameDuplicationException; 052import ptolemy.kernel.util.SingletonAttribute; 053 054/** 055 * @author Dylan Uys, Jiaxin Li 056 * 057 * This actor performs a query from the Postgres database that stores HPWREN 058 * weather data using SparkSQL's distributed query engine. Query options, such 059 * as the server, database, table name, and more, are configured in the 060 * HashMap<String, String> options member of the class. 061 */ 062public class SparkSQLQuery extends SparkSQLActor { 063 064 public SparkSQLQuery(CompositeEntity container, String name) 065 throws IllegalActionException, NameDuplicationException { 066 super(container, name); 067 068 // initialize ports and parameters 069 first = new TypedIOPort(this, "first", false, true); 070 first.setTypeEquals(new ObjectType(Row.class)); 071 new SingletonAttribute(first, "_showName"); 072 073 out = new TypedIOPort(this, "out", false, true); 074 out.setTypeEquals(new ObjectType(Dataset.class)); 075 new SingletonAttribute(out, "_showName"); 076 077 count = new TypedIOPort(this, "count", false, true); 078 count.setTypeEquals(BaseType.LONG); 079 new SingletonAttribute(count, "_showName"); 080 081 queryString = new TypedIOPort(this, "queryString", true, false); 082 queryString.setTypeEquals(BaseType.STRING); 083 new SingletonAttribute(queryString, "_showName"); 084 085 sqlConnectionURL = new StringParameter(this, "sqlConnectionURL"); 086 sqlConnectionURL.setToken(""); 087 088 sqlDriver = new StringParameter(this, "sqlDriver"); 089 sqlDriver.setToken(SQL_DRIVER); 090 091 partitionColumn = new StringParameter(this, "partitionColumn"); 092 //partitionColumn.setToken("id"); 093 094 lowerBound = new StringParameter(this, "lowerBound"); 095 //lowerBound.setToken("0"); 096 097 upperBound = new StringParameter(this, "upperBound"); 098 //upperBound.setToken("2147483647"); 099 100 numPartitions = new StringParameter(this, "numPartitions"); 101 //numPartitions.setToken("12"); 102 103 forceCollect = new Parameter(this, "forceCollect"); 104 forceCollect.setTypeEquals(BaseType.BOOLEAN); 105 forceCollect.setToken(BooleanToken.FALSE); 106 107 } // end Constructor 108 109 110 @Override 111 public void preinitialize() throws IllegalActionException { 112 super.preinitialize(); 113 114 _connectionURL = sqlConnectionURL.stringValue(); 115 116 options = new HashMap<String, String>(); 117 118 // initialize the options HashMap from the actor's parameter values 119 options.put("driver", SQL_DRIVER); 120 options.put("url", _connectionURL); 121 } 122 123 124 /* 125 * Perform the sql query that is specified by the "dbtable" field 126 * The results are stored in a Dataset<Row> and broadcasted. 127 */ 128 @Override 129 public void fire() throws IllegalActionException { 130 131 super.fire(); 132 133 /* build the query */ 134 String query = ((StringToken)queryString.get(0)).stringValue(); 135 136 //DEBUG 137 if(_debugging) 138 System.out.println("QUERY STRING: \n" + query + "\n"); 139 140 options.put("dbtable", query); 141 142 // Add miscellaneous options if specified 143 if("" != partitionColumn.stringValue() && 144 "" != lowerBound.stringValue() && 145 "" != upperBound.stringValue() && 146 "" != numPartitions.stringValue()) { 147 options.put("partitionColumn", partitionColumn.stringValue()); 148 options.put("lowerBound", lowerBound.stringValue()); 149 options.put("upperBound", upperBound.stringValue()); 150 options.put("numPartitions", numPartitions.stringValue()); 151 } 152 153 /* start query */ 154 try { 155 Dataset<Row> df 156 = _sqlContext.read().format("jdbc").options(options).load(); 157 158 // if specified, trigger SQL transfer immediately 159 if (forceCollect.getToken() == BooleanToken.TRUE) { 160 StructType schema = df.schema(); 161 df=_sqlContext.createDataFrame(df.collectAsList(), 162 schema).cache(); 163 } 164 165 // broadcasdt results 166 out.broadcast(new ObjectToken(df, Dataset.class)); 167 168 // only broadcast debug information if recipient actors present 169 // NOTE: no Spark call if debug ports are not hooked up 170 if (first.numberOfSinks() > 0) 171 first.broadcast(new ObjectToken(df.first(), Row.class)); 172 173 if (count.numberOfSinks() > 0) 174 count.broadcast(new LongToken(df.count())); 175 176 } catch (OutOfMemoryError e) { 177 System.err.println(e.getMessage()); 178 System.err.println("ERROR: SparkSQLQuery: Out of Java Heap Space!"); 179 } catch (IllegalActionException e) { 180 System.err.println("ERROR: SparkSQLQuery: Failed to read from database."); 181 } 182 } 183 184 185 /** The first element of the DataFrame. Debug */ 186 public TypedIOPort first; 187 188 /** The DataFrame resulting from the SparkSQL query */ 189 public TypedIOPort out; 190 191 /** Number of Rows in the DataFrame */ 192 public TypedIOPort count; 193 194 /** Options map for SQL config */ 195 protected Map<String, String> options; 196 197 /** Query string */ 198 public TypedIOPort queryString; 199 200 /** SQL connection URL */ 201 public StringParameter sqlConnectionURL; 202 203 /** SQL driver to use */ 204 public StringParameter sqlDriver; 205 206 /** column to partition on */ 207 public StringParameter partitionColumn; 208 209 /** lower bound for partitioning */ 210 public StringParameter lowerBound; 211 212 /** upper bound for partitioning */ 213 public StringParameter upperBound; 214 215 /** number of partitions */ 216 public StringParameter numPartitions; 217 218 /** Option to force SQL transfer immediately */ 219 public Parameter forceCollect; 220 221 222 /* Private fields*/ 223 private static final String SQL_DRIVER = "org.postgresql.Driver"; 224 private static String _connectionURL; 225 226}