001/* A Spark actor that saves models.
002 * 
003 * Copyright (c) 2015 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-03 18:46:55 +0000 (Thu, 03 Sep 2015) $' 
008 * '$Revision: 33859 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.actor;
031
032import java.io.File;
033
034import org.apache.spark.mllib.util.Saveable;
035import org.kepler.spark.util.SparkUtilities;
036
037import ptolemy.actor.parameters.PortParameter;
038import ptolemy.data.BooleanToken;
039import ptolemy.data.StringToken;
040import ptolemy.data.expr.Parameter;
041import ptolemy.data.type.BaseType;
042import ptolemy.kernel.CompositeEntity;
043import ptolemy.kernel.util.IllegalActionException;
044import ptolemy.kernel.util.NameDuplicationException;
045import ptolemy.kernel.util.SingletonAttribute;
046
047/** A base class for Spark actors that save models.
048 * 
049 *  @author Daniel Crawl
050 *  @version $Id: SaveableModelActor.java 33859 2015-09-03 18:46:55Z crawl $
051 */
052public abstract class SaveableModelActor extends SparkBaseActor {
053
054    /** Create a new SaveableModelActor in a container with a specific name. */
055    public SaveableModelActor(CompositeEntity container, String name)
056            throws IllegalActionException, NameDuplicationException {
057        super(container, name);
058        
059        modelPath = new PortParameter(this, "modelPath");        
060        modelPath.setTypeEquals(BaseType.STRING);
061        modelPath.getPort().setTypeEquals(BaseType.STRING);
062        new SingletonAttribute(modelPath.getPort(), "_showName");
063        modelPath.setStringMode(true);
064
065        overwriteExistingModel = new Parameter(this, "overwriteExistingModel");
066        overwriteExistingModel.setTypeEquals(BaseType.BOOLEAN);
067        // set the default value to false so that the model is not accidently
068        // overwritten.
069        overwriteExistingModel.setToken(BooleanToken.FALSE);
070        
071    }
072
073    /** Get the model path and make sure it's not empty. */
074    @Override
075    public void fire() throws IllegalActionException {
076        
077        super.fire();
078        
079        modelPath.update();
080        _modelPathVal = ((StringToken)modelPath.getToken()).stringValue();
081        
082        if(_modelPathVal.isEmpty()) {
083            throw new IllegalActionException(this, "Model path is empty.");
084        }
085    }
086    
087    /** Check if the model path already exists. If it does and we can
088     *  overwrite it, then delete it.
089     */
090    @Override
091    public boolean postfire() throws IllegalActionException {
092        
093        boolean retval = super.postfire();
094
095        File existingModelDir = new File(_modelPathVal);
096        if(existingModelDir.exists() &&
097            ((BooleanToken)overwriteExistingModel.getToken()).booleanValue()) {
098            
099            File metadataDir = new File(existingModelDir, "metadata");
100            if(metadataDir.exists()) {
101                SparkUtilities.deleteDirectoryOutput(metadataDir.toURI(), _context.hadoopConfiguration());
102            }
103            File dataDir = new File(existingModelDir, "data");
104            if(dataDir.exists()) {
105                SparkUtilities.deleteDirectoryOutput(dataDir.toURI(), _context.hadoopConfiguration());
106            }
107
108            if(!existingModelDir.delete()) {
109                System.err.println("Could not delete " + existingModelDir);
110            }
111        }
112        
113        // Save model to file 
114        _model.save(_context.sc(), _modelPathVal);
115
116        return retval;
117    }
118
119    /** Location to save the model. */
120    public PortParameter modelPath;             
121
122    /** If true, overwrite the existing model. */
123    public Parameter overwriteExistingModel;
124    
125    /** The model to save. */
126    protected Saveable _model;
127    
128    /** The path to save the model. */
129    private String _modelPathVal;
130}