001/* Utilities for Kepler Spark stubs.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-29 19:11:04 +0000 (Tue, 29 Sep 2015) $' 
008 * '$Revision: 34002 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.io.ByteArrayInputStream;
033import java.io.DataInputStream;
034import java.io.File;
035import java.io.IOException;
036import java.util.Date;
037import java.util.List;
038import java.util.concurrent.atomic.AtomicBoolean;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.io.BytesWritable;
044import org.apache.hadoop.io.DoubleWritable;
045import org.apache.hadoop.io.IntWritable;
046import org.apache.hadoop.io.LongWritable;
047import org.apache.hadoop.io.NullWritable;
048import org.apache.hadoop.io.ObjectWritable;
049import org.apache.hadoop.io.Text;
050import org.kepler.Kepler;
051import org.kepler.build.project.ProjectLocator;
052import org.kepler.ddp.Utilities;
053import org.kepler.spark.type.RecordValue;
054
055import ptolemy.data.ArrayToken;
056import ptolemy.data.DateToken;
057import ptolemy.data.DoubleToken;
058import ptolemy.data.IntToken;
059import ptolemy.data.LongToken;
060import ptolemy.data.ObjectToken;
061import ptolemy.data.RecordToken;
062import ptolemy.data.StringToken;
063import ptolemy.data.Token;
064import ptolemy.data.UnsignedByteToken;
065import ptolemy.data.type.ArrayType;
066import ptolemy.data.type.BaseType;
067import ptolemy.data.type.ObjectType;
068import ptolemy.data.type.RecordType;
069import ptolemy.data.type.Type;
070import ptolemy.kernel.util.IllegalActionException;
071import scala.Tuple2;
072
073/** A collection of utilities used by Kepler Spark stubs.
074 * 
075 * @author Daniel Crawl
076 * @version $Id: StubUtilities.java 34002 2015-09-29 19:11:04Z crawl $
077 */
078public class StubUtilities {
079    
080    /** Convert from a (possible) Hadoop Writable to an object. */
081    public static Object convertToObject(Object value) {
082        if(value instanceof Text) {
083            return ((Text)value).toString();
084        } else if(value instanceof LongWritable) {
085            return Long.valueOf(((LongWritable)value).get());
086        } else {
087            return value;
088        }
089    }
090
091    /** Convert Hadoop data types or Java Objects to a Ptolemy token. */
092    public static Token convertToToken(Object value) throws IllegalActionException {
093                
094        //LOG.debug("class of value '" + value + "' is: " + value.getClass());
095        Token retval = null;
096        if (value instanceof NullWritable) {
097            retval = Token.NIL;
098        } else if (value instanceof Text) {
099            retval = new StringToken(((Text) value).toString());
100        } else if (value instanceof String) {
101            retval = new StringToken(((String) value));
102        } else if (value instanceof IntWritable) {
103            retval = new IntToken(((IntWritable) value).get());
104        } else if (value instanceof Integer) {
105            retval = new IntToken(((Integer)value).intValue());
106        } else if (value instanceof LongWritable) {
107            retval = new LongToken(((LongWritable) value).get());
108        } else if (value instanceof Long) {
109            retval = new LongToken(((Long)value).longValue());
110        } else if (value instanceof DoubleWritable) {
111            retval = new DoubleToken(((DoubleWritable) value).get());
112        } else if (value instanceof Double) {
113            retval = new DoubleToken(((Double)value).doubleValue());
114        } else if (value instanceof RecordValue) {
115            retval = ((RecordValue)value).recordToken();
116        } else if (value instanceof ObjectWritable) {
117            try {
118                retval = new ObjectToken(((ObjectWritable) value).get());
119            } catch (IllegalActionException e) {
120
121                e.printStackTrace();
122                throw new RuntimeException("Error creating ObjectToken: "
123                        + e.getMessage());
124            }
125        } else if (value instanceof BytesWritable) {
126            try {
127                retval = new ObjectToken(((BytesWritable) value).copyBytes(),
128                        byte[].class);
129            } catch (IllegalActionException e) {
130                e.printStackTrace();
131                throw new RuntimeException("Error creating ObjectToken: "
132                        + e.getMessage());
133            }
134        } else if (value instanceof Date) {
135            return new DateToken(((Date)value).getTime());
136        } else if (value instanceof String[]) {
137            String[] arrayValue = (String[]) value;
138            Token[] array = new Token[arrayValue.length];
139            for(int i = 0; i < arrayValue.length; i++) {
140                array[i] = new StringToken(arrayValue[i]);
141            }
142            retval = new ArrayToken(array);
143        } else if (value instanceof Integer[]) {
144            Integer[] arrayValue = (Integer[]) value;
145            Token[] array = new Token[arrayValue.length];
146            for(int i = 0; i < arrayValue.length; i++) {
147                array[i] = new IntToken(arrayValue[i].intValue());
148            }
149            retval = new ArrayToken(array);
150        } else if (value instanceof Long[]) {
151            Long[] arrayValue = (Long[]) value;
152            Token[] array = new Token[arrayValue.length];
153            for(int i = 0; i < arrayValue.length; i++) {
154                array[i] = new LongToken(arrayValue[i].longValue());
155            }
156            retval = new ArrayToken(array);
157        } else if (value instanceof Double[]) {
158            Double[] arrayValue = (Double[]) value;
159            Token[] array = new Token[arrayValue.length];
160            for(int i = 0; i < arrayValue.length; i++) {
161                array[i] = new DoubleToken(arrayValue[i].doubleValue());
162            }
163            retval = new ArrayToken(array);
164        } else if (value instanceof RecordValue[]) {
165            RecordValue[] arrayValue = (RecordValue[]) value;
166            Token[] array = new Token[arrayValue.length];
167            for(int i = 0; i < arrayValue.length; i++) {
168                array[i] = arrayValue[i].recordToken();
169            }
170            retval = new ArrayToken(array);
171        } else {
172            throw new RuntimeException("Unsupported type of Hadoop data: "
173                    + value.getClass());
174        }
175        //LOG.debug("convert hadoop data " + value + " into kepler data: "
176                //+ retval);
177        return retval;
178    }
179
180    /** Convert a list of record tokens into a list of Tuple2s containing Java Objects. */
181    public static void convertTokenToTupleList(List<Token> tokenList, List<Tuple2<Object, Object>> out)
182            throws IllegalActionException {
183        
184        if (tokenList == null) {
185                return;
186        }
187        
188        for (Token token: tokenList) {
189                if(!(token instanceof ArrayToken)) {
190                    throw new IllegalActionException("Read non-array token: " + token);
191                }
192                
193                final ArrayToken arrayToken = (ArrayToken)token;
194                        
195                if(!(arrayToken.getElementType() instanceof RecordType)) {
196                    throw new IllegalActionException("Read non-record of array token: " + token);
197                }
198                
199                final int length = arrayToken.length();
200                for(int i = 0; i < length; i++) {
201                    final RecordToken element = (RecordToken) arrayToken.getElement(i);
202                    final Object key = convertTokenToObject(element.get("key"));
203                    final Object value = convertTokenToObject(element.get("value"));
204                    out.add(new Tuple2<Object,Object>(key, value));
205                }
206        }
207    }
208    
209    /** Convert lists of keys and values to a list of Tuple2s. */
210    public static void convertListsToTupleList(List<?> keys, List<?> values, List<Tuple2<Object, Object>> tuples) {
211        for(int i = 0; i < keys.size(); i++) {
212            tuples.add(new Tuple2<Object, Object>(keys.get(i), values.get(i)));
213        }
214    }
215
216    /** Convert a token to a Java Object. */
217    public static Object convertTokenToObject(Token token) throws IllegalActionException {
218        
219        final Type type = token.getType();
220        
221        Object retval = null;
222        if(type instanceof ObjectType) {
223            
224            final Class<?> clazz = ((ObjectType)type).getValueClass();
225            if(clazz == byte[].class) {
226                retval =  (byte[])((ObjectToken)token).getValue();
227            } else {
228                throw new IllegalActionException("Conversion of Object Token with class " +
229                    clazz + " is unsupported.");
230            }
231        } else if(type == BaseType.STRING) {
232            retval = ((StringToken)token).stringValue();
233        } else if(type == BaseType.INT) {
234            retval = Integer.valueOf(((IntToken)token).intValue());
235        } else if(type == BaseType.LONG) {
236            retval = Long.valueOf(((LongToken)token).longValue());
237        } else if(type == BaseType.DOUBLE) {
238            retval = Double.valueOf(((DoubleToken)token).doubleValue());
239        } else if(type == BaseType.NIL) {
240                retval = NullWritable.get();
241        } else if(type == BaseType.DATE) {
242            retval = new Date(((DateToken)token).getValue());
243        } else if(type instanceof ArrayType) {
244            retval = convertArrayTokenToObject((ArrayToken)token);
245        } else if(type instanceof RecordType) {
246            retval = new RecordValue((RecordToken)token);
247        }
248        if (retval == null){
249            throw new IllegalActionException("Unsupported token type: " + type);
250        }
251        
252        return retval;
253    }
254    
255    /** Convert ArrayToken to an array of objects. */
256    public static Object convertArrayTokenToObject(ArrayToken arrayToken)
257        throws IllegalActionException{
258        
259        final Type type = arrayToken.getElementType();
260        final int length = arrayToken.length();
261        
262        if(type instanceof ObjectType) {
263            
264            /*
265            final Class<?> clazz = ((ObjectType)type).getValueClass();
266            if(clazz == byte[].class) {
267                Object[] retval = new Object[length];
268                // TODO
269                return retval;
270            } else {
271                throw new IllegalActionException("Conversion of Object Token with class " +
272                    clazz + " is unsupported.");
273            }
274            */
275        } else if(type == BaseType.STRING) {
276            String[] retval = new String[length];
277            for (int i = 0; i < length; i++) {
278                retval[i] = ((StringToken)arrayToken.getElement(i)).stringValue(); 
279            }
280            return retval;
281        } else if(type == BaseType.INT) {
282            Integer[] retval = new Integer[length];
283            for (int i = 0; i < length; i++) {
284                retval[i] = ((IntToken)arrayToken.getElement(i)).intValue(); 
285            }
286            return retval;
287        } else if(type == BaseType.LONG) {
288            Long[] retval = new Long[length];
289            for (int i = 0; i < length; i++) {
290                retval[i] = ((LongToken)arrayToken.getElement(i)).longValue(); 
291            }
292            return retval;
293        } else if(type == BaseType.DOUBLE) {
294            Double[] retval = new Double[length];
295            for (int i = 0; i < length; i++) {
296                retval[i] = ((DoubleToken)arrayToken.getElement(i)).doubleValue(); 
297            }
298            return retval;
299        //} else if(type == BaseType.NIL) {
300            // TODO
301            //retval = NullWritable.get();
302        } else if(type == BaseType.UNSIGNED_BYTE) {
303            byte[] retval = new byte[length];
304            for (int i = 0; i < length; i++) {
305                retval[i] = ((UnsignedByteToken)arrayToken.getElement(i)).byteValue(); 
306            }
307            return retval;
308        } else if(type == BaseType.DATE) {
309            Date[] retval = new Date[length];
310            for (int i = 0; i < length; i++) {
311                retval[i] = new Date(((DateToken)arrayToken.getElement(i)).getValue()); 
312            }
313            return retval;
314        //} else if(type instanceof ArrayType) {
315            // TODO
316            //retval = convertArrayTokenToObject((ArrayToken)token);
317        } else if(type instanceof RecordType) {
318            RecordValue[] retval = new RecordValue[length];
319            for (int i = 0; i < length; i++) {
320                retval[i] = new RecordValue((RecordToken)arrayToken.getElement(i)); 
321            }
322            return retval;
323        }
324
325        throw new IllegalActionException("Unsupported array token type: " + type);
326        }
327    
328    /** Initialize the parts of Kepler required to run the sub-workflow in
329     *  the stub.
330     */
331    public synchronized static void initializeKepler(Configuration parameters) {
332        
333        // see if we have already initialized once in this JVM.
334        if(!_haveInitializedOnce.get()) {
335                
336                // see if the kepler modules directory exists.
337                String modulesDirStr = parameters.get(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, null);
338                if(modulesDirStr != null) {
339                    File modulesDir = new File(modulesDirStr);
340                    if(modulesDir.exists() && modulesDir.isDirectory()) {
341                
342                        // set the directory
343                        ProjectLocator.setKeplerModulesDir(modulesDir);
344                        
345                        if(ProjectLocator.shouldUtilizeUserKeplerModules()) {
346                        File file = new File(System.getProperty("user.home") + File.separator +
347                                "KeplerData" + File.separator +
348                                "kepler.modules" + File.separator +
349                                "build-area");
350                        ProjectLocator.setUserBuildDir(file);
351                        }
352                        
353                        try {
354                            Kepler.setJavaPropertiesAndCopyModuleDirectories();
355                        } catch (Exception e) {
356                            throw new RuntimeException("Error setting java properties and copying module directories.", e);
357                        }   
358                    }
359                }
360                
361                _haveInitializedOnce.set(true);
362        }        
363    }
364    
365    /** Initialize Ptolemy. */
366    public static void initializePtolemy() {
367        
368        // Ptolemy throws an exception if ptolemy.ptII.dir is not set.
369        try {
370            System.setProperty("ptolemy.ptII.dir", "foooo");
371        } catch(Throwable t) {
372            LOG.error("can't set property.", t);
373        }
374    }
375    
376    public static Configuration deserializeConfiguration(byte[] bytes) throws IOException {
377        
378        ByteArrayInputStream arrayStream = null;
379        DataInputStream inputStream = null;
380        try {
381            arrayStream = new ByteArrayInputStream(bytes);
382            inputStream = new DataInputStream(arrayStream);
383            Configuration configuration = new Configuration();
384            configuration.readFields(inputStream);
385            return configuration;
386        } finally {
387            if(inputStream != null) {
388                inputStream.close();
389            }
390            if(arrayStream != null) {
391                arrayStream.close();
392            }
393        }
394    }
395    
396    ///////////////////////////////////////////////////////////////////
397    ////                         private methods                   ////
398
399    ///////////////////////////////////////////////////////////////////
400    ////                         private fields                    ////
401
402    private static final Log LOG = LogFactory.getLog(StubUtilities.class);
403
404    /** If true, we have initialized Kepler for this process. */
405    private static AtomicBoolean _haveInitializedOnce = new AtomicBoolean(false);
406
407}