001/* 002 * Copyright (c) 2015 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-10-30 22:38:25 +0000 (Fri, 30 Oct 2015) $' 007 * '$Revision: 34169 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.spark.actor; 030 031import org.apache.spark.api.java.JavaSparkContext; 032 033import ptolemy.actor.TypedAtomicActor; 034import ptolemy.data.StringToken; 035import ptolemy.data.expr.StringParameter; 036import ptolemy.kernel.CompositeEntity; 037import ptolemy.kernel.util.IllegalActionException; 038import ptolemy.kernel.util.NameDuplicationException; 039import ptolemy.kernel.util.NamedObj; 040import ptolemy.kernel.util.Workspace; 041 042/** A base class for actors using JavaSparkContexts. The connectionName 043 * parameter refers to a SparkConnection in the workflow that provides 044 * a JavaSparkContext object. 045 * 046 * @author Daniel Crawl 047 * @version $Id: SparkBaseActor.java 34169 2015-10-30 22:38:25Z crawl $ 048 */ 049public abstract class SparkBaseActor extends TypedAtomicActor { 050 051 public SparkBaseActor(CompositeEntity container, String name) 052 throws IllegalActionException, NameDuplicationException { 053 super(container, name); 054 055 connectionName = new StringParameter(this, "connectionName"); 056 } 057 058 /** Clone this object into the specified workspace. */ 059 @Override 060 public Object clone(Workspace workspace) throws CloneNotSupportedException { 061 SparkBaseActor newObject = (SparkBaseActor) super.clone(workspace); 062 newObject._context = null; 063 return newObject; 064 } 065 066 @Override 067 public void preinitialize() throws IllegalActionException { 068 069 super.preinitialize(); 070 071 _context = null; 072 073 String connectionNameStr = ((StringToken)connectionName.getToken()).stringValue(); 074 075 // if it's empty, get the default connection 076 if(connectionNameStr.trim().isEmpty()) { 077 _context = SparkConnection.getDefaultContext(); 078 } else { 079 080 SparkConnection connection = null; 081 082 // search the workflow hierarchy for the connection 083 NamedObj container = getContainer(); 084 while(container != null && connection == null) { 085 for(SparkConnection attribute : container.attributeList(SparkConnection.class)) { 086 if(attribute.getConnectionName().equals(connectionNameStr)) { 087 connection = attribute; 088 break; 089 } 090 } 091 container = container.getContainer(); 092 } 093 094 if(connection == null) { 095 throw new IllegalActionException(this, 096 "Could not find SparkConnection with name \"" + connectionNameStr + "\"."); 097 } 098 099 _context = connection.getContext(); 100 } 101 102 if(_context == null) { 103 throw new IllegalActionException("Could not get Spark context."); 104 } 105 } 106 107 /** Free resources. */ 108 @Override 109 public void wrapup() throws IllegalActionException { 110 super.wrapup(); 111 112 if(_context != null) { 113 SparkConnection.releaseContext(); 114 _context = null; 115 } 116 } 117 118 /** The name of the SparkConnection attribute. */ 119 public StringParameter connectionName; 120 121 /** The spark context. */ 122 protected JavaSparkContext _context; 123}