001/* A Stratosphere Map stub that executes a ScriptEngine.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.LinkedList;
033import java.util.List;
034
035import javax.script.Invocable;
036import javax.script.ScriptEngine;
037import javax.script.ScriptEngineFactory;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.kepler.ddp.Utilities;
042import org.kepler.stratosphere.type.TypeUtilities;
043
044import eu.stratosphere.api.java.record.functions.MapFunction;
045import eu.stratosphere.configuration.Configuration;
046import eu.stratosphere.types.Key;
047import eu.stratosphere.types.Record;
048import eu.stratosphere.types.Value;
049import eu.stratosphere.util.Collector;
050import ptolemy.actor.TypeAttribute;
051import ptolemy.data.type.Type;
052import ptolemy.kernel.util.Workspace;
053
054/** A Stratosphere Map stub that executes a ScriptEngine. The engine
055 *  is loaded in configure(), and the map() function invoked each time
056 *  this class's map() is called.
057 * 
058 *  @author Daniel Crawl
059 *  @version $Id: ScriptEngineMapStub.java 33628 2015-08-24 22:42:20Z crawl $
060 */
061public class ScriptEngineMapStub extends MapFunction {
062
063    /*
064    public CodeMapStub() {
065    }
066    */
067
068    @Override
069    public void close() throws Exception {
070        
071        super.close();
072        _engine = null;
073        _inputKeyType = null;
074        _inputValueType = null;
075        _resultKeys = null;
076        _resultValues = null;
077    }
078    
079    @Override
080    public void open(Configuration parameters) throws Exception {
081        
082        // get the script engine factory name
083        String scriptEngineFactoryName = 
084                parameters.getString(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, null);
085        if(scriptEngineFactoryName == null) {
086            throw new RuntimeException("No script engine factory specified.");
087        }
088        
089        // get the code
090        String codeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_STUB_CODE, null);
091        if(codeStr == null) {
092            throw new RuntimeException("No code specified.");
093        }
094
095        // create the engine
096        
097        try {
098            Class<?> clazz = Class.forName(scriptEngineFactoryName);
099            ScriptEngineFactory factory = (ScriptEngineFactory)clazz.newInstance();
100            _engine = factory.getScriptEngine();
101        } catch(Throwable t) {
102            throw new RuntimeException("Error creating script engine for " + scriptEngineFactoryName, t);
103        }
104        
105        if(_engine == null) {
106            throw new RuntimeException("Could not find script engine for " + scriptEngineFactoryName);
107        }
108        
109        // works in java
110        // not in groovy
111        //_engine.eval("List resultKeys = new LinkedList(); List resultValues = new LinkedList();");
112        
113        
114        // not in groovy
115        //List resultKeys = new LinkedList(); List resultValues = new LinkedList();
116        //_engine.put("resultKeys", resultKeys);
117        //_engine.put("resultValues", resultValues);
118        
119        // parse the code
120        _engine.eval(codeStr);
121        
122        if(!(_engine instanceof Invocable)) {
123            throw new RuntimeException("Script engine for " + scriptEngineFactoryName + " is not invocable.");
124        }
125
126        _invocable = (Invocable) _engine;
127
128        
129        StubUtilities.initializePtolemy();
130        
131        // get the key and value types
132
133        TypeAttribute typeAttribute = new TypeAttribute(new Workspace());
134        
135        String keyTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_KEY_TYPE, null);
136        if(keyTypeStr != null) {
137            typeAttribute.setExpression(keyTypeStr);
138            Type keyType = typeAttribute.getType();
139            _inputKeyType = TypeUtilities.getPactKeyTypeForPtolemyType(keyType);
140        }
141
142        String valueTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_VALUE_TYPE, null);
143        if(valueTypeStr == null) {
144            throw new RuntimeException("No value type specified.");
145        }
146        typeAttribute.setExpression(valueTypeStr);
147        Type valueType = typeAttribute.getType();
148        _inputValueType = TypeUtilities.getPactValueTypeForPtolemyType(valueType);
149        
150        _resultKeys = new LinkedList<Object>();
151        _resultValues = new LinkedList<Object>();
152
153        //LOG.info("map types: ik = " + _inputKeyType + " iv = " + _inputValueType);
154        
155    }
156    
157    @Override
158    public void map(Record record, Collector<Record> out) throws Exception {
159                
160        // see if there's only one field
161        if(record.getNumFields() == 1) {
162            
163            // since there's only one field, assume it is the value
164            Object value = StubUtilities.convertPactDataToObject(record, 0, _inputValueType);
165            
166            //System.out.println("input: " + value);
167            
168            _invocable.invokeFunction("map", value, _resultKeys, _resultValues);
169            
170        } else {
171            
172            Object key = StubUtilities.convertPactDataToObject(record, TypeUtilities.KEY_FIELD, _inputKeyType);
173            Object value = StubUtilities.convertPactDataToObject(record, TypeUtilities.VALUE_FIELD, _inputValueType);
174
175            _invocable.invokeFunction("map", key, value, _resultKeys, _resultValues);
176        }
177                
178        //System.out.println("result keys: " + resultKeys);
179        //System.out.println("result values: " + resultValues);
180        
181        // convert results to collector
182        StubUtilities.convertResultsToCollector(_resultKeys, _resultValues, out);    
183        
184        // clear the lists since the results are no longer needed.
185        _resultKeys.clear();
186        _resultValues.clear();
187
188    }
189
190    /** Input key type. */
191    private Class<? extends Key> _inputKeyType;
192
193    /** Input value type. */
194    private Class<? extends Value> _inputValueType;
195
196    /** Logging. */
197    private static final Log LOG = LogFactory.getLog(ScriptEngineMapStub.class);
198
199    /** The engine to parse scripts. */
200    private ScriptEngine _engine;
201    
202    /** Interface to execute functions in scripts. */
203    private Invocable _invocable;
204    
205    /** A list of result keys. */
206    private List<?> _resultKeys;
207    
208    /** A list of result values. */
209    private List<?> _resultValues;
210    
211}