001/* A Stratosphere Map stub that executes a ScriptEngine. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.LinkedList; 033import java.util.List; 034 035import javax.script.Invocable; 036import javax.script.ScriptEngine; 037import javax.script.ScriptEngineFactory; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.kepler.ddp.Utilities; 042import org.kepler.stratosphere.type.TypeUtilities; 043 044import eu.stratosphere.api.java.record.functions.MapFunction; 045import eu.stratosphere.configuration.Configuration; 046import eu.stratosphere.types.Key; 047import eu.stratosphere.types.Record; 048import eu.stratosphere.types.Value; 049import eu.stratosphere.util.Collector; 050import ptolemy.actor.TypeAttribute; 051import ptolemy.data.type.Type; 052import ptolemy.kernel.util.Workspace; 053 054/** A Stratosphere Map stub that executes a ScriptEngine. The engine 055 * is loaded in configure(), and the map() function invoked each time 056 * this class's map() is called. 057 * 058 * @author Daniel Crawl 059 * @version $Id: ScriptEngineMapStub.java 33628 2015-08-24 22:42:20Z crawl $ 060 */ 061public class ScriptEngineMapStub extends MapFunction { 062 063 /* 064 public CodeMapStub() { 065 } 066 */ 067 068 @Override 069 public void close() throws Exception { 070 071 super.close(); 072 _engine = null; 073 _inputKeyType = null; 074 _inputValueType = null; 075 _resultKeys = null; 076 _resultValues = null; 077 } 078 079 @Override 080 public void open(Configuration parameters) throws Exception { 081 082 // get the script engine factory name 083 String scriptEngineFactoryName = 084 parameters.getString(Utilities.CONFIGURATION_KEPLER_SCRIPT_ENGINE_FACTORY_NAME, null); 085 if(scriptEngineFactoryName == null) { 086 throw new RuntimeException("No script engine factory specified."); 087 } 088 089 // get the code 090 String codeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_STUB_CODE, null); 091 if(codeStr == null) { 092 throw new RuntimeException("No code specified."); 093 } 094 095 // create the engine 096 097 try { 098 Class<?> clazz = Class.forName(scriptEngineFactoryName); 099 ScriptEngineFactory factory = (ScriptEngineFactory)clazz.newInstance(); 100 _engine = factory.getScriptEngine(); 101 } catch(Throwable t) { 102 throw new RuntimeException("Error creating script engine for " + scriptEngineFactoryName, t); 103 } 104 105 if(_engine == null) { 106 throw new RuntimeException("Could not find script engine for " + scriptEngineFactoryName); 107 } 108 109 // works in java 110 // not in groovy 111 //_engine.eval("List resultKeys = new LinkedList(); List resultValues = new LinkedList();"); 112 113 114 // not in groovy 115 //List resultKeys = new LinkedList(); List resultValues = new LinkedList(); 116 //_engine.put("resultKeys", resultKeys); 117 //_engine.put("resultValues", resultValues); 118 119 // parse the code 120 _engine.eval(codeStr); 121 122 if(!(_engine instanceof Invocable)) { 123 throw new RuntimeException("Script engine for " + scriptEngineFactoryName + " is not invocable."); 124 } 125 126 _invocable = (Invocable) _engine; 127 128 129 StubUtilities.initializePtolemy(); 130 131 // get the key and value types 132 133 TypeAttribute typeAttribute = new TypeAttribute(new Workspace()); 134 135 String keyTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_KEY_TYPE, null); 136 if(keyTypeStr != null) { 137 typeAttribute.setExpression(keyTypeStr); 138 Type keyType = typeAttribute.getType(); 139 _inputKeyType = TypeUtilities.getPactKeyTypeForPtolemyType(keyType); 140 } 141 142 String valueTypeStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_VALUE_TYPE, null); 143 if(valueTypeStr == null) { 144 throw new RuntimeException("No value type specified."); 145 } 146 typeAttribute.setExpression(valueTypeStr); 147 Type valueType = typeAttribute.getType(); 148 _inputValueType = TypeUtilities.getPactValueTypeForPtolemyType(valueType); 149 150 _resultKeys = new LinkedList<Object>(); 151 _resultValues = new LinkedList<Object>(); 152 153 //LOG.info("map types: ik = " + _inputKeyType + " iv = " + _inputValueType); 154 155 } 156 157 @Override 158 public void map(Record record, Collector<Record> out) throws Exception { 159 160 // see if there's only one field 161 if(record.getNumFields() == 1) { 162 163 // since there's only one field, assume it is the value 164 Object value = StubUtilities.convertPactDataToObject(record, 0, _inputValueType); 165 166 //System.out.println("input: " + value); 167 168 _invocable.invokeFunction("map", value, _resultKeys, _resultValues); 169 170 } else { 171 172 Object key = StubUtilities.convertPactDataToObject(record, TypeUtilities.KEY_FIELD, _inputKeyType); 173 Object value = StubUtilities.convertPactDataToObject(record, TypeUtilities.VALUE_FIELD, _inputValueType); 174 175 _invocable.invokeFunction("map", key, value, _resultKeys, _resultValues); 176 } 177 178 //System.out.println("result keys: " + resultKeys); 179 //System.out.println("result values: " + resultValues); 180 181 // convert results to collector 182 StubUtilities.convertResultsToCollector(_resultKeys, _resultValues, out); 183 184 // clear the lists since the results are no longer needed. 185 _resultKeys.clear(); 186 _resultValues.clear(); 187 188 } 189 190 /** Input key type. */ 191 private Class<? extends Key> _inputKeyType; 192 193 /** Input value type. */ 194 private Class<? extends Value> _inputValueType; 195 196 /** Logging. */ 197 private static final Log LOG = LogFactory.getLog(ScriptEngineMapStub.class); 198 199 /** The engine to parse scripts. */ 200 private ScriptEngine _engine; 201 202 /** Interface to execute functions in scripts. */ 203 private Invocable _invocable; 204 205 /** A list of result keys. */ 206 private List<?> _resultKeys; 207 208 /** A list of result values. */ 209 private List<?> _resultValues; 210 211}