001/* 
002 * Copyright (c) 2015 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-10-30 22:38:25 +0000 (Fri, 30 Oct 2015) $' 
007 * '$Revision: 34169 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.spark.actor;
030
031import org.apache.spark.api.java.JavaSparkContext;
032
033import ptolemy.actor.TypedAtomicActor;
034import ptolemy.data.StringToken;
035import ptolemy.data.expr.StringParameter;
036import ptolemy.kernel.CompositeEntity;
037import ptolemy.kernel.util.IllegalActionException;
038import ptolemy.kernel.util.NameDuplicationException;
039import ptolemy.kernel.util.NamedObj;
040import ptolemy.kernel.util.Workspace;
041
042/** A base class for actors using JavaSparkContexts. The connectionName
043 *  parameter refers to a SparkConnection in the workflow that provides
044 *  a JavaSparkContext object.
045 * 
046 * @author Daniel Crawl
047 * @version $Id: SparkBaseActor.java 34169 2015-10-30 22:38:25Z crawl $
048 */
049public abstract class SparkBaseActor extends TypedAtomicActor {
050
051    public SparkBaseActor(CompositeEntity container, String name)
052            throws IllegalActionException, NameDuplicationException {
053        super(container, name);
054        
055        connectionName = new StringParameter(this, "connectionName");
056    }
057
058    /** Clone this object into the specified workspace. */
059    @Override
060    public Object clone(Workspace workspace) throws CloneNotSupportedException {
061        SparkBaseActor newObject = (SparkBaseActor) super.clone(workspace);
062        newObject._context = null;
063        return newObject;
064    }
065
066    @Override
067    public void preinitialize() throws IllegalActionException {
068        
069        super.preinitialize();
070        
071        _context = null;
072        
073        String connectionNameStr = ((StringToken)connectionName.getToken()).stringValue();
074        
075        // if it's empty, get the default connection
076        if(connectionNameStr.trim().isEmpty()) {
077            _context = SparkConnection.getDefaultContext();
078        } else {
079        
080            SparkConnection connection = null;
081
082            // search the workflow hierarchy for the connection
083            NamedObj container = getContainer();
084            while(container != null && connection == null) {
085                for(SparkConnection attribute : container.attributeList(SparkConnection.class)) {
086                    if(attribute.getConnectionName().equals(connectionNameStr)) {
087                        connection = attribute;
088                        break;
089                    }
090                }
091                container = container.getContainer();
092            }
093           
094            if(connection == null) {
095                throw new IllegalActionException(this,
096                    "Could not find SparkConnection with name \"" + connectionNameStr + "\".");
097            }
098            
099            _context = connection.getContext();
100        }
101        
102        if(_context == null) {
103            throw new IllegalActionException("Could not get Spark context.");
104        }
105    }
106    
107    /** Free resources. */
108    @Override
109    public void wrapup() throws IllegalActionException {
110        super.wrapup();
111        
112        if(_context != null) {
113            SparkConnection.releaseContext();
114            _context = null;
115        }
116    }
117    
118    /** The name of the SparkConnection attribute. */
119    public StringParameter connectionName;
120    
121    /** The spark context. */
122    protected JavaSparkContext _context;
123}