001/* A Stratosphere Reduce stub that executes a ScriptEngine. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.List; 035 036import javax.script.Invocable; 037import javax.script.ScriptEngine; 038import javax.script.ScriptEngineFactory; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.kepler.ddp.Utilities; 043import org.kepler.stratosphere.type.TypeUtilities; 044 045import eu.stratosphere.api.java.record.functions.ReduceFunction; 046import eu.stratosphere.configuration.Configuration; 047import eu.stratosphere.types.Key; 048import eu.stratosphere.types.Record; 049import eu.stratosphere.types.Value; 050import eu.stratosphere.util.Collector; 051import ptolemy.actor.TypeAttribute; 052import ptolemy.data.type.Type; 053import ptolemy.kernel.util.Workspace; 054 055/** A Stratosphere Reduce stub that executes a ScriptEngine. The engine 056 * is loaded in configure(), and the reduce() function invoked each time 057 * this class's reduce() is called. 058 * 059 * @author Daniel Crawl 060 * @version $Id: ScriptEngineReduceStub.java 33628 2015-08-24 22:42:20Z crawl $ 061 */ 062public class ScriptEngineReduceStub extends ReduceFunction { 063 064 /* 065 public CodeReduceStub() { 066 } 067 */ 068 069 @Override 070 public void close() throws Exception { 071 super.close(); 072 _engine = null; 073 _inputKeyType = null; 074 _inputValueType = null; 075 _resultKeys = null; 076 _resultValues = null; 077 } 078 079 @Override 080 public void open(Configuration parameters) throws Exception { 081 082 // get the script engine factory name 083 String scriptEngineFactoryName = 084 parameters.getString(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, null); 085 if(scriptEngineFactoryName == null) { 086 throw new RuntimeException("No script engine factory specified."); 087 } 088 089 // get the code 090 String codeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_STUB_CODE, null); 091 if(codeStr == null) { 092 throw new RuntimeException("No code specified."); 093 } 094 095 // create the engine 096 097 try { 098 Class<?> clazz = Class.forName(scriptEngineFactoryName); 099 ScriptEngineFactory factory = (ScriptEngineFactory)clazz.newInstance(); 100 _engine = factory.getScriptEngine(); 101 } catch(Throwable t) { 102 throw new RuntimeException("Error creating script engine for " + scriptEngineFactoryName, t); 103 } 104 105 if(_engine == null) { 106 throw new RuntimeException("Could not find script engine for " + scriptEngineFactoryName); 107 } 108 109 // parse the code 110 _engine.eval(codeStr); 111 112 if(!(_engine instanceof Invocable)) { 113 throw new RuntimeException("Script engine for " + scriptEngineFactoryName + " is not invocable."); 114 } 115 116 _invocable = (Invocable) _engine; 117 118 119 StubUtilities.initializePtolemy(); 120 121 // get the key and value types 122 123 TypeAttribute typeAttribute = new TypeAttribute(new Workspace()); 124 125 String keyTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_KEY_TYPE, null); 126 if(keyTypeStr == null) { 127 throw new RuntimeException("No key type specified."); 128 } 129 130 typeAttribute.setExpression(keyTypeStr); 131 Type keyType = typeAttribute.getType(); 132 _inputKeyType = TypeUtilities.getPactKeyTypeForPtolemyType(keyType); 133 134 String valueTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_VALUE_TYPE, null); 135 if(valueTypeStr == null) { 136 throw new RuntimeException("No value type specified."); 137 } 138 typeAttribute.setExpression(valueTypeStr); 139 Type valueType = typeAttribute.getType(); 140 _inputValueType = TypeUtilities.getPactValueTypeForPtolemyType(valueType); 141 142 _resultKeys = new LinkedList<Object>(); 143 _resultValues = new LinkedList<Object>(); 144 145 //LOG.info("reduce types: ik = " + _inputKeyType + " iv = " + _inputValueType); 146 147 } 148 149 @Override 150 public void reduce(Iterator<Record> records, Collector<Record> out) 151 throws Exception { 152 153 // get key and value from record and put in engine 154 Object key = null; 155 List<Object> values = new LinkedList<Object>(); 156 157 while(records.hasNext()) { 158 final Record record = records.next(); 159 160 if(key == null) { 161 key = StubUtilities.convertPactDataToObject(record, TypeUtilities.KEY_FIELD, _inputKeyType); 162 } 163 164 values.add(StubUtilities.convertPactDataToObject(record, TypeUtilities.VALUE_FIELD, _inputValueType)); 165 } 166 167 //System.out.println("input key: " + key + " values = " + values); 168 169 _invocable.invokeFunction("reduce", key, values, _resultKeys, _resultValues); 170 171 //System.out.println("result keys: " + _resultKeys); 172 //System.out.println("result values: " + _resultValues); 173 174 // convert results to collector 175 StubUtilities.convertResultsToCollector(_resultKeys, _resultValues, out); 176 177 // clear the lists since the results are no longer needed. 178 _resultKeys.clear(); 179 _resultValues.clear(); 180 181 } 182 183 /** Input key type. */ 184 private Class<? extends Key> _inputKeyType; 185 186 /** Input value type. */ 187 private Class<? extends Value> _inputValueType; 188 189 /** Logging. */ 190 private static final Log LOG = LogFactory.getLog(ScriptEngineReduceStub.class); 191 192 /** The engine to parse scripts. */ 193 private ScriptEngine _engine; 194 195 /** Interface to execute functions in scripts. */ 196 private Invocable _invocable; 197 198 /** A list of result keys. */ 199 private List<?> _resultKeys; 200 201 /** A list of result values. */ 202 private List<?> _resultValues; 203 204}