001/* A Stratosphere Reduce stub that executes a ScriptEngine.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035
036import javax.script.Invocable;
037import javax.script.ScriptEngine;
038import javax.script.ScriptEngineFactory;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.kepler.ddp.Utilities;
043import org.kepler.stratosphere.type.TypeUtilities;
044
045import eu.stratosphere.api.java.record.functions.ReduceFunction;
046import eu.stratosphere.configuration.Configuration;
047import eu.stratosphere.types.Key;
048import eu.stratosphere.types.Record;
049import eu.stratosphere.types.Value;
050import eu.stratosphere.util.Collector;
051import ptolemy.actor.TypeAttribute;
052import ptolemy.data.type.Type;
053import ptolemy.kernel.util.Workspace;
054
055/** A Stratosphere Reduce stub that executes a ScriptEngine. The engine
056 *  is loaded in configure(), and the reduce() function invoked each time
057 *  this class's reduce() is called.
058 * 
059 *  @author Daniel Crawl
060 *  @version $Id: ScriptEngineReduceStub.java 33628 2015-08-24 22:42:20Z crawl $
061 */
062public class ScriptEngineReduceStub extends ReduceFunction {
063
064    /*
065    public CodeReduceStub() {
066    }
067    */
068    
069    @Override
070    public void close() throws Exception {
071        super.close();
072        _engine = null;
073        _inputKeyType = null;
074        _inputValueType = null;
075        _resultKeys = null;
076        _resultValues = null;
077    }
078    
079    @Override
080    public void open(Configuration parameters) throws Exception {
081        
082        // get the script engine factory name
083        String scriptEngineFactoryName = 
084                parameters.getString(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, null);
085        if(scriptEngineFactoryName == null) {
086            throw new RuntimeException("No script engine factory specified.");
087        }
088        
089        // get the code
090        String codeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_STUB_CODE, null);
091        if(codeStr == null) {
092            throw new RuntimeException("No code specified.");
093        }
094
095        // create the engine
096        
097        try {
098            Class<?> clazz = Class.forName(scriptEngineFactoryName);
099            ScriptEngineFactory factory = (ScriptEngineFactory)clazz.newInstance();
100            _engine = factory.getScriptEngine();
101        } catch(Throwable t) {
102            throw new RuntimeException("Error creating script engine for " + scriptEngineFactoryName, t);
103        }
104        
105        if(_engine == null) {
106            throw new RuntimeException("Could not find script engine for " + scriptEngineFactoryName);
107        }
108        
109        // parse the code
110        _engine.eval(codeStr);
111        
112        if(!(_engine instanceof Invocable)) {
113            throw new RuntimeException("Script engine for " + scriptEngineFactoryName + " is not invocable.");
114        }
115
116        _invocable = (Invocable) _engine;
117
118        
119        StubUtilities.initializePtolemy();
120
121        // get the key and value types
122
123        TypeAttribute typeAttribute = new TypeAttribute(new Workspace());
124        
125        String keyTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_KEY_TYPE, null);
126        if(keyTypeStr == null) {
127            throw new RuntimeException("No key type specified.");
128        }
129
130        typeAttribute.setExpression(keyTypeStr);
131        Type keyType = typeAttribute.getType();
132        _inputKeyType = TypeUtilities.getPactKeyTypeForPtolemyType(keyType);
133
134        String valueTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_VALUE_TYPE, null);
135        if(valueTypeStr == null) {
136            throw new RuntimeException("No value type specified.");
137        }
138        typeAttribute.setExpression(valueTypeStr);
139        Type valueType = typeAttribute.getType();
140        _inputValueType = TypeUtilities.getPactValueTypeForPtolemyType(valueType);
141        
142        _resultKeys = new LinkedList<Object>();
143        _resultValues = new LinkedList<Object>();
144
145        //LOG.info("reduce types: ik = " + _inputKeyType + " iv = " + _inputValueType);
146        
147    }
148
149    @Override
150    public void reduce(Iterator<Record> records, Collector<Record> out)
151            throws Exception {
152
153        // get key and value from record and put in engine
154        Object key = null;
155        List<Object> values = new LinkedList<Object>();
156        
157        while(records.hasNext()) {
158            final Record record = records.next();
159            
160            if(key == null) {
161                key = StubUtilities.convertPactDataToObject(record, TypeUtilities.KEY_FIELD, _inputKeyType);
162            }
163                
164            values.add(StubUtilities.convertPactDataToObject(record, TypeUtilities.VALUE_FIELD, _inputValueType));
165        }
166
167        //System.out.println("input key: " + key + " values = " + values);
168        
169        _invocable.invokeFunction("reduce", key, values, _resultKeys, _resultValues);
170                
171        //System.out.println("result keys: " + _resultKeys);
172        //System.out.println("result values: " + _resultValues);
173        
174        // convert results to collector
175        StubUtilities.convertResultsToCollector(_resultKeys, _resultValues, out);        
176
177        // clear the lists since the results are no longer needed.
178        _resultKeys.clear();
179        _resultValues.clear();
180        
181    }
182
183    /** Input key type. */
184    private Class<? extends Key> _inputKeyType;
185
186    /** Input value type. */
187    private Class<? extends Value> _inputValueType;
188
189    /** Logging. */
190    private static final Log LOG = LogFactory.getLog(ScriptEngineReduceStub.class);
191
192    /** The engine to parse scripts. */
193    private ScriptEngine _engine;
194    
195    /** Interface to execute functions in scripts. */
196    private Invocable _invocable;
197    
198    /** A list of result keys. */
199    private List<?> _resultKeys;
200    
201    /** A list of result values. */
202    private List<?> _resultValues;
203
204}