001/* Utilities for Kepler PACT stubs.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.io.File;
033import java.lang.reflect.Field;
034import java.util.ArrayList;
035import java.util.Iterator;
036import java.util.List;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.kepler.Kepler;
042import org.kepler.build.project.ProjectLocator;
043import org.kepler.ddp.Utilities;
044import org.kepler.stratosphere.type.ByteArray;
045import org.kepler.stratosphere.type.RecordValue;
046
047import eu.stratosphere.configuration.Configuration;
048import eu.stratosphere.types.DoubleValue;
049import eu.stratosphere.types.IntValue;
050import eu.stratosphere.types.Key;
051import eu.stratosphere.types.LongValue;
052import eu.stratosphere.types.NullValue;
053import eu.stratosphere.types.Record;
054import eu.stratosphere.types.StringValue;
055import eu.stratosphere.types.Value;
056import eu.stratosphere.util.Collector;
057import ptolemy.data.ArrayToken;
058import ptolemy.data.DoubleToken;
059import ptolemy.data.IntToken;
060import ptolemy.data.LongToken;
061import ptolemy.data.ObjectToken;
062import ptolemy.data.RecordToken;
063import ptolemy.data.StringToken;
064import ptolemy.data.Token;
065import ptolemy.data.UnsignedByteToken;
066import ptolemy.data.type.ArrayType;
067import ptolemy.data.type.BaseType;
068import ptolemy.data.type.ObjectType;
069import ptolemy.data.type.RecordType;
070import ptolemy.data.type.Type;
071import ptolemy.kernel.util.IllegalActionException;
072
073/** A collection of utilities used by Kepler PACT stubs.
074 * 
075 * @author Daniel Crawl
076 * @version $Id: StubUtilities.java 33628 2015-08-24 22:42:20Z crawl $
077 */
078
079public class StubUtilities {
080    
081    /** Write lists of Java Object keys and values to a Collector. */
082    public static void convertResultsToCollector(List<?> resultKeys, List<?> resultValues,
083            Collector<Record> collector) {
084        
085        for(int i = 0; i < resultKeys.size(); i++) {
086            collector.collect(new Record(
087                    _convertObjectToPactData(((List<?>)resultKeys).get(i)),
088                    _convertObjectToPactData(((List<?>)resultValues).get(i))));
089        }
090                    
091    }
092
093    /** Convert a token to PACT data and add to a Collector. */
094    public static void convertTokenToCollector(List<Token> tokenList, Collector<Record> collector)
095            throws IllegalActionException {
096        
097        if (tokenList == null) {
098                return;
099        }
100        
101        for (Token token: tokenList) {
102                if(!(token instanceof ArrayToken)) {
103                    throw new IllegalActionException("Read non-array token: " + token);
104                }
105                
106                final ArrayToken arrayToken = (ArrayToken)token;
107                        
108                if(!(arrayToken.getElementType() instanceof RecordType)) {
109                    throw new IllegalActionException("Read non-record of array token: " + token);
110                }
111                
112                final int length = arrayToken.length();
113                for(int i = 0; i < length; i++) {
114                    final RecordToken element = (RecordToken) arrayToken.getElement(i);
115                    final Key keyField = convertTokenToPactKeyData(element.get("key"));
116                    final Value valueField = convertTokenToPactValueData(element.get("value"));
117                    collector.collect(new Record(keyField, valueField));
118                }
119        }
120    }
121
122    /** Convert a token to PACT Value data. */
123    public static Value convertTokenToPactValueData(Token token) throws IllegalActionException {
124       
125        Value retval = convertTokenToPactKeyData(token);        
126        return retval;
127    }
128    
129    /** Convert a token to PACT Key data. */
130    public static Key convertTokenToPactKeyData(Token token) throws IllegalActionException {
131        
132        final Type type = token.getType();
133        
134        Key retval = null;
135        if(type instanceof ObjectType) {
136            
137            final Class<?> clazz = ((ObjectType)type).getValueClass();
138            if(clazz == byte[].class) {
139                byte[] bytes =  (byte[])((ObjectToken)token).getValue();
140                retval = new ByteArray(bytes);
141            } else {
142                throw new IllegalActionException("Conversion of Object Token with class " +
143                    clazz + " is unsupported.");
144            }
145        } else if(type == BaseType.STRING) {
146            retval = new StringValue(((StringToken)token).stringValue());
147        } else if(type == BaseType.INT) {
148            retval = new IntValue(((IntToken)token).intValue());
149        } else if(type == BaseType.LONG) {
150            retval = new LongValue(((LongToken)token).longValue());
151        } else if(type == BaseType.DOUBLE) {
152            retval = new DoubleValue(((DoubleToken)token).doubleValue());
153        } else if(type == BaseType.NIL) {
154                retval = new NullValue();
155        } else if(type instanceof ArrayType) {
156                if(((ArrayType)type).getElementType() == BaseType.UNSIGNED_BYTE)
157                        retval = new ByteArray(arrayTokenToByteArray((ArrayToken)token));
158        } else if(type instanceof RecordType) {
159            retval = new RecordValue(token.toString());
160        }
161        if (retval == null){
162            throw new IllegalActionException("Unsupported token type: " + type);
163        }
164        
165        return retval;
166    }
167
168    /** Convert PACT data to an object. */
169    public static Object convertPactDataToObject(final Record record, final int fieldNum,
170        Class<? extends Value> classType) {
171
172        Object retval = null;
173        
174        //LOG.info("record has " + record.getNumFields() + " fields, asking for field " + fieldNum + " as type " + classType);
175        
176        // check if the record has enough fields
177        // if not, return null
178        if(record.getNumFields() <= fieldNum) {
179            return retval;
180        }
181        
182        final Value value = record.getField(fieldNum, classType);
183        
184        //LOG.info("  value retrieved is class " + value.getClass());
185        
186        if(value instanceof NullValue) {
187            retval = null;
188        // NOTE: check for RecordValue before StringValue since it's a subclass.
189        } else if(value instanceof RecordValue) {
190            throw new RuntimeException("Record data not supported...");
191        } else if(value instanceof StringValue) {
192            retval = ((StringValue) value).getValue();
193        } else if(value instanceof IntValue) {
194            retval = Integer.valueOf(((IntValue)value).getValue());
195        } else if(value instanceof LongValue) {
196            retval = Long.valueOf(((LongValue)value).getValue());
197        } else if(value instanceof DoubleValue) {
198            retval = Double.valueOf(((DoubleValue)value).getValue());
199        }
200        if (retval == null){
201            throw new RuntimeException("Unsupported type of PACT data: " + value.getClass());
202        }
203    
204        return retval;
205    }
206    /** Convert PACT data to a token. */
207    public static Token convertPactDataToToken(final Record record, final int fieldNum,
208        Class<? extends Value> classType) {
209
210        Token retval = null;
211        
212        //LOG.info("record has " + record.getNumFields() + " fields, asking for field " + fieldNum + " as type " + classType);
213        
214        // check if the record has enough fields
215        // if not, return null
216        if(record.getNumFields() <= fieldNum) {
217            return retval;
218        }
219        
220        final Value value = record.getField(fieldNum, classType);
221        
222        //LOG.info("  value retrieved is class " + value.getClass());
223        
224        if(value instanceof NullValue) {
225            retval = Token.NIL;
226        // NOTE: check for RecordValue before StringValue since it's a subclass.
227        } else if(value instanceof RecordValue) {
228            try {
229                retval = new RecordToken(((RecordValue) value).getValue());
230            } catch (IllegalActionException e) {
231                throw new RuntimeException("Error creating RecordToken.", e);
232            }
233        } else if(value instanceof StringValue) {
234            retval = new StringToken(((StringValue) value).getValue());
235        } else if(value instanceof IntValue) {
236            retval = new IntToken(((IntValue)value).getValue());
237        } else if(value instanceof LongValue) {
238            retval = new LongToken(((LongValue)value).getValue());
239        } else if(value instanceof DoubleValue) {
240            retval = new DoubleToken(((DoubleValue)value).getValue());
241        } else if(value instanceof ByteArray) { //convert PACT ByteArray to Ptolemy ObjectToken, which is better in performance.
242            try {
243                retval = new ObjectToken(((ByteArray)value).getValue(), byte[].class);
244                //retval = new ArrayToken(byteArrayToToken(((ByteArray)value).getValue()));
245            } catch (IllegalActionException e) {
246                throw new RuntimeException("Error creating ObjectToken.", e);
247            }
248        }
249        if (retval == null){
250            throw new RuntimeException("Unsupported type of PACT data: " + value.getClass());
251        }
252    
253        return retval;
254    }
255
256    /** Convert byte array data to an array of UnsignedByteToken. */
257    public static UnsignedByteToken[] byteArrayToToken(byte[] byteArray){
258                UnsignedByteToken[] byteTokenArray = new UnsignedByteToken[byteArray.length];
259                for (int t = 0; t < byteArray.length; t++) {
260                        byteTokenArray[t] = new UnsignedByteToken(byteArray[t]);
261                }
262                return byteTokenArray;
263        }
264    
265    /** Convert ArrayToken to an array of Value. 
266     * @throws IllegalActionException */
267    public static Value[] arrayTokenToPactData(ArrayToken arrayToken) throws IllegalActionException{
268        Value[] byteTokenArray = new Value[arrayToken.length()];
269        for (int i=0; i<arrayToken.length(); i++) {
270                byteTokenArray[i] = convertTokenToPactValueData(arrayToken.getElement(i));
271        }
272                return byteTokenArray;
273        }
274    
275    /** Convert ArrayToken to an array of byte. 
276     * @throws IllegalActionException */
277    public static byte[] arrayTokenToByteArray(ArrayToken arrayToken) throws IllegalActionException{
278        if (arrayToken.getElementType() != BaseType.UNSIGNED_BYTE)
279                return null;
280        byte[] byteTokenArray = new byte[arrayToken.length()];
281        for (int i=0; i<arrayToken.length(); i++) {
282                byteTokenArray[i] = ((UnsignedByteToken)arrayToken.getElement(i)).byteValue();
283        }
284                return byteTokenArray;
285        }
286    
287    /** Convert PACT data to a token. */
288    public static Token convertPactDataToToken(Iterator<Record> records, int fieldNum,
289        Class<? extends Value> classType) {
290
291        final List<Token> tokenList = new ArrayList<Token>();
292        while(records.hasNext()) {
293            tokenList.add(convertPactDataToToken(records.next(), fieldNum, classType));
294        }
295        try {
296            return new ArrayToken(tokenList.toArray(new Token[tokenList.size()]));
297        } catch (IllegalActionException e) {
298            throw new RuntimeException("Error creating array token.", e);
299        }
300    }
301    
302    /** Initialize the parts of Kepler required to run the sub-workflow in
303     *  the stub.
304     */
305    public synchronized static void initializeKepler(Configuration parameters) {
306        
307        // see if we have already initialized once in this JVM.
308        if(!_haveInitializedOnce.get()) {
309                
310                // see if the kepler modules directory exists.
311                String modulesDirStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, null);
312                if(modulesDirStr != null) {
313                    File modulesDir = new File(modulesDirStr);
314                    if(modulesDir.exists() && modulesDir.isDirectory()) {
315                
316                        // set the directory
317                        ProjectLocator.setKeplerModulesDir(modulesDir);
318                        
319                        // FIXME this is a bad hack
320                        // ProjectLocator.userBuildDir has to be set, otherwise findKeplerModulesDir()
321                        // is called, which fails when running in the stratosphere server.
322                        // userBuildDir is a private field, so we have to use reflection to modify it.
323                        // unfortunately, we cannot add a method in ProjectLocator since kepler-tasks
324                        // cannot be patched!
325                        //
326                        // this sets userBuildDir to $HOME/KeplerData/kepler.modules/build-area
327                        //
328                        if(ProjectLocator.shouldUtilizeUserKeplerModules()) {
329                            try {
330                            Field field = ProjectLocator.class.getDeclaredField("userBuildDir");
331                            field.setAccessible(true);
332                            File file = new File(System.getProperty("user.home") + File.separator +
333                                    "KeplerData" + File.separator +
334                                    "kepler.modules" + File.separator +
335                                    "build-area");
336                            field.set(null, file);
337                        } catch (Throwable e) {
338                            throw new RuntimeException("Error setting user build dir in ProjectLocator.", e);
339                        }
340                        }
341                        
342                        try {
343                            Kepler.setJavaPropertiesAndCopyModuleDirectories();
344                        } catch (Exception e) {
345                            throw new RuntimeException("Error setting java properties and copying module directories.", e);
346                        }   
347                    }
348                }
349                
350                _haveInitializedOnce.set(true);
351        }        
352    }
353    
354    /** Initialize Ptolemy. */
355    public static void initializePtolemy() {
356        
357        // Ptolemy throws an exception if ptolemy.ptII.dir is not set.
358        try {
359            System.setProperty("ptolemy.ptII.dir", "foooo");
360        } catch(Throwable t) {
361            LOG.error("can't set property.", t);
362        }
363    }
364    
365    ///////////////////////////////////////////////////////////////////
366    ////                         private methods                   ////
367
368    /** Convert a Java Object to a Pact Key. */
369    private static Key _convertObjectToPactData(Object object) {
370                        
371        if(object instanceof String) {
372            return new StringValue((String)object);
373        } else if(object instanceof Integer) {
374            return new IntValue(((Integer)object).intValue());
375        } else if(object instanceof Long) {
376            return new LongValue(((Long)object).longValue());
377        } else if(object instanceof Double) {
378            return new DoubleValue(((Double)object).doubleValue());
379        }
380
381        throw new RuntimeException("Unsupported type of Object: " + object.getClass());
382    }
383
384    ///////////////////////////////////////////////////////////////////
385    ////                         private fields                    ////
386
387    private static final Log LOG = LogFactory.getLog(StubUtilities.class);
388
389    /** If true, we have initialized Kepler for this process. */
390    private static AtomicBoolean _haveInitializedOnce = new AtomicBoolean(false);
391
392}