001/* 
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-07 18:53:35 +0000 (Wed, 07 Feb 2018) $' 
007 * '$Revision: 34661 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import java.util.List;
033
034import org.apache.spark.sql.Dataset;
035import org.apache.spark.sql.Row;
036import org.kepler.spark.actor.SparkBaseActor;
037
038import ptolemy.actor.TypedIOPort;
039import ptolemy.data.BooleanToken;
040import ptolemy.data.ObjectToken;
041import ptolemy.data.StringToken;
042import ptolemy.data.expr.Parameter;
043import ptolemy.data.type.BaseType;
044import ptolemy.data.type.ObjectType;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.NameDuplicationException;
048import ptolemy.kernel.util.SingletonAttribute;
049
050/**
051 * @author  Jiaxin Li
052 *
053 * Converts a Dataset<Row> to a JSON string
054 * NOTE: Currently Runs slow for large datasets due to the need to collect
055 *       all JSON strings to the driver
056 */
057public class DatasetToJson extends SparkBaseActor {
058
059    public DatasetToJson(CompositeEntity container, String name)
060        throws IllegalActionException, NameDuplicationException {
061        super(container, name);
062
063        inData = new TypedIOPort(this, "inData", true, false);
064        inData.setTypeEquals(new ObjectType(Dataset.class));
065        new SingletonAttribute(inData, "_showName"); 
066
067        outData = new TypedIOPort(this, "outData", false, true);
068        outData.setTypeEquals(BaseType.STRING);
069        new SingletonAttribute(outData, "_showName");
070
071        outputByRow = new Parameter(this, "outputByRow"); 
072        outputByRow.setTypeEquals(BaseType.BOOLEAN);
073        outputByRow.setToken(BooleanToken.FALSE);
074
075    }
076
077
078    /*
079     * Fire
080     */
081    @Override
082    public void fire() throws IllegalActionException {
083
084        super.fire();
085
086        // get incoming Spark dataframe
087        // TODO: handle multiple dataframes
088        Dataset<Row> inDf =
089            (Dataset<Row>)((ObjectToken)inData.get(0)).getValue();
090
091        //DEBUG
092        /*
093        if(_debugging) {
094            System.out.println("DatasetToJson:");
095            inDf.printSchema();
096        }
097        */
098
099        // convert Dataset<Row> to a Dataset<String> containing JSON strings
100        Dataset<String> jsonDf = inDf.toJSON();
101        List<String> jsonRows = jsonDf.collectAsList(); 
102    
103        if(outputByRow.getToken() == BooleanToken.TRUE) {
104            // NOTE: one StringToken for each row
105            for(String row : jsonRows)
106                outData.broadcast(new StringToken(row));
107        } else {
108            // NOTE: one JSON array for all rows
109            StringBuilder sb = new StringBuilder();
110            sb.append("[");
111            for(String row : jsonRows) {
112                sb.append(row);
113                sb.append(",");
114            }
115            if(sb.charAt(sb.length()-1) == ',')
116                sb.deleteCharAt(sb.length()-1); // delete last ',' in array
117            sb.append("]");
118            outData.broadcast(new StringToken(sb.toString()));
119        }
120        
121    }
122
123    /** Dataset<Row> input */
124    public TypedIOPort inData;
125
126    /** JSON string output */
127    public TypedIOPort outData;
128
129    /** Option to output one JSON string (as an JSON object) per row */
130    public Parameter outputByRow;
131
132}