001/* 
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-06 19:09:58 +0000 (Tue, 06 Feb 2018) $' 
007 * '$Revision: 34656 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import java.util.HashMap;
033import java.util.Map;
034
035import org.apache.spark.sql.Dataset;
036import org.apache.spark.sql.Row;
037import org.apache.spark.sql.types.StructType;
038import org.kepler.spark.actor.SparkSQLActor;
039
040import ptolemy.actor.TypedIOPort;
041import ptolemy.data.BooleanToken;
042import ptolemy.data.LongToken;
043import ptolemy.data.ObjectToken;
044import ptolemy.data.StringToken;
045import ptolemy.data.expr.Parameter;
046import ptolemy.data.expr.StringParameter;
047import ptolemy.data.type.BaseType;
048import ptolemy.data.type.ObjectType;
049import ptolemy.kernel.CompositeEntity;
050import ptolemy.kernel.util.IllegalActionException;
051import ptolemy.kernel.util.NameDuplicationException;
052import ptolemy.kernel.util.SingletonAttribute;
053
054/**
055 * @author Dylan Uys, Jiaxin Li
056 *
057 * This actor performs a query from the Postgres database that stores HPWREN 
058 * weather data using SparkSQL's distributed query engine. Query options, such
059 * as the server, database, table name, and more, are configured in the 
060 * HashMap<String, String> options member of the class.
061 */
062public class SparkSQLQuery extends SparkSQLActor {
063
064    public SparkSQLQuery(CompositeEntity container, String name)
065        throws IllegalActionException, NameDuplicationException {
066        super(container, name);
067        
068        // initialize ports and parameters
069        first = new TypedIOPort(this, "first", false, true);
070        first.setTypeEquals(new ObjectType(Row.class));
071        new SingletonAttribute(first, "_showName");
072  
073        out = new TypedIOPort(this, "out", false, true);
074        out.setTypeEquals(new ObjectType(Dataset.class));
075        new SingletonAttribute(out, "_showName");
076        
077        count = new TypedIOPort(this, "count", false, true);
078        count.setTypeEquals(BaseType.LONG);
079        new SingletonAttribute(count, "_showName");
080
081        queryString = new TypedIOPort(this, "queryString", true, false);
082        queryString.setTypeEquals(BaseType.STRING);
083        new SingletonAttribute(queryString, "_showName");
084
085        sqlConnectionURL = new StringParameter(this, "sqlConnectionURL");
086        sqlConnectionURL.setToken("");
087
088        sqlDriver = new StringParameter(this, "sqlDriver");
089        sqlDriver.setToken(SQL_DRIVER);
090
091        partitionColumn = new StringParameter(this, "partitionColumn");
092        //partitionColumn.setToken("id");
093
094        lowerBound = new StringParameter(this, "lowerBound");
095        //lowerBound.setToken("0");
096
097        upperBound = new StringParameter(this, "upperBound");
098        //upperBound.setToken("2147483647");
099
100        numPartitions = new StringParameter(this, "numPartitions");
101        //numPartitions.setToken("12");
102
103        forceCollect = new Parameter(this, "forceCollect");
104        forceCollect.setTypeEquals(BaseType.BOOLEAN);
105        forceCollect.setToken(BooleanToken.FALSE);
106        
107    } // end Constructor
108
109    
110    @Override
111    public void preinitialize() throws IllegalActionException {
112        super.preinitialize();
113        
114        _connectionURL = sqlConnectionURL.stringValue();
115        
116        options = new HashMap<String, String>();
117
118        // initialize the options HashMap from the actor's parameter values
119        options.put("driver", SQL_DRIVER);
120        options.put("url", _connectionURL);
121    } 
122
123
124    /*
125     *  Perform the sql query that is specified by the "dbtable" field
126     *  The results are stored in a Dataset<Row> and broadcasted.   
127     */ 
128    @Override
129    public void fire() throws IllegalActionException { 
130        
131        super.fire();
132
133        /* build the query */
134        String query = ((StringToken)queryString.get(0)).stringValue();
135
136        //DEBUG
137        if(_debugging)
138            System.out.println("QUERY STRING: \n" + query + "\n"); 
139
140        options.put("dbtable", query);
141
142        // Add miscellaneous options if specified
143        if("" != partitionColumn.stringValue() &&
144           "" != lowerBound.stringValue() &&
145           "" != upperBound.stringValue() &&
146           "" != numPartitions.stringValue()) {
147            options.put("partitionColumn", partitionColumn.stringValue());
148            options.put("lowerBound", lowerBound.stringValue());
149            options.put("upperBound", upperBound.stringValue()); 
150            options.put("numPartitions", numPartitions.stringValue());
151        }
152        
153        /* start query */
154        try {
155            Dataset<Row> df 
156                = _sqlContext.read().format("jdbc").options(options).load();
157            
158            // if specified, trigger SQL transfer immediately
159            if (forceCollect.getToken() == BooleanToken.TRUE) {
160                StructType schema = df.schema();
161                df=_sqlContext.createDataFrame(df.collectAsList(),
162                                               schema).cache();
163            }
164            
165            // broadcasdt results
166            out.broadcast(new ObjectToken(df, Dataset.class)); 
167            
168            // only broadcast debug information if recipient actors present
169            // NOTE: no Spark call if debug ports are not hooked up
170            if (first.numberOfSinks() > 0) 
171                first.broadcast(new ObjectToken(df.first(), Row.class)); 
172            
173            if (count.numberOfSinks() > 0) 
174                count.broadcast(new LongToken(df.count())); 
175            
176        } catch (OutOfMemoryError e) { 
177            System.err.println(e.getMessage()); 
178            System.err.println("ERROR: SparkSQLQuery: Out of Java Heap Space!");
179        } catch (IllegalActionException e) {
180            System.err.println("ERROR: SparkSQLQuery: Failed to read from database.");
181        }
182    }
183
184    
185    /** The first element of the DataFrame. Debug */
186    public TypedIOPort first;
187    
188    /** The DataFrame resulting from the SparkSQL query */
189    public TypedIOPort out;
190    
191    /** Number of Rows in the DataFrame */
192    public TypedIOPort count;
193    
194    /** Options map for SQL config */
195    protected Map<String, String> options;
196
197    /** Query string */
198    public TypedIOPort queryString;
199
200    /** SQL connection URL */
201    public StringParameter sqlConnectionURL;
202
203    /** SQL driver to use */
204    public StringParameter sqlDriver;
205
206    /** column to partition on */
207    public StringParameter partitionColumn;
208
209    /** lower bound for partitioning */ 
210    public StringParameter lowerBound;
211
212    /** upper bound for partitioning */
213    public StringParameter upperBound;
214
215    /** number of partitions */ 
216    public StringParameter numPartitions;
217
218    /** Option to force SQL transfer immediately */
219    public Parameter forceCollect;
220
221
222    /* Private fields*/
223    private static final String SQL_DRIVER = "org.postgresql.Driver";
224    private static String _connectionURL;
225
226}