001/* 
002 * Copyright (c) 2012-2013 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 
007 * '$Revision: 33070 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.hadoop.util;
030
031import java.io.File;
032import java.lang.reflect.Field;
033import java.util.ArrayList;
034import java.util.Iterator;
035import java.util.List;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.io.BytesWritable;
042import org.apache.hadoop.io.DoubleWritable;
043import org.apache.hadoop.io.IntWritable;
044import org.apache.hadoop.io.LongWritable;
045import org.apache.hadoop.io.NullWritable;
046import org.apache.hadoop.io.ObjectWritable;
047import org.apache.hadoop.io.Text;
048import org.apache.hadoop.io.Writable;
049import org.kepler.Kepler;
050import org.kepler.build.project.ProjectLocator;
051import org.kepler.ddp.Utilities;
052import org.kepler.hadoop.io.KeyValuePair;
053import org.kepler.hadoop.io.RecordTokenWritable;
054import org.kepler.hadoop.io.TaggedBytesWritable;
055import org.kepler.hadoop.io.TaggedDoubleWritable;
056import org.kepler.hadoop.io.TaggedIntWritable;
057import org.kepler.hadoop.io.TaggedLongWritable;
058import org.kepler.hadoop.io.TaggedNullWritable;
059import org.kepler.hadoop.io.TaggedObjectWritable;
060import org.kepler.hadoop.io.TaggedText;
061import org.kepler.hadoop.io.TokenWritable;
062
063import ptolemy.data.ArrayToken;
064import ptolemy.data.DoubleToken;
065import ptolemy.data.IntToken;
066import ptolemy.data.LongToken;
067import ptolemy.data.ObjectToken;
068import ptolemy.data.RecordToken;
069import ptolemy.data.StringToken;
070import ptolemy.data.Token;
071import ptolemy.data.UnsignedByteToken;
072import ptolemy.data.type.ArrayType;
073import ptolemy.data.type.BaseType;
074import ptolemy.data.type.ObjectType;
075import ptolemy.data.type.RecordType;
076import ptolemy.data.type.Type;
077import ptolemy.kernel.util.IllegalActionException;
078
079/**
080 * A collection of utilities used by Kepler hadoop module. It is built based on
081 * org.kepler.stratosphere.stub.StubUtilities.
082 * 
083 * @author Jianwu Wang
084 * @version $Id: StubUtilities.java 33070 2014-11-12 23:21:09Z crawl $
085 */
086
087public class StubUtilities {
088
089        /** Convert a token to hadoop data and add to a Collector. */
090        public static void convertTokenToList(List<Token> tokenList,
091                        List<KeyValuePair> outputList) throws IllegalActionException {
092
093                if (tokenList == null) {
094                        //LOG.debug("token is null before converting into list.");
095                        return;
096                }
097                for (Token token : tokenList) {
098
099                        //LOG.debug("token type before converting into list:"
100                                        //+ token.getType());
101                        //LOG.debug("token value before converting into list:"
102                                        //+ token.toString());
103
104                        if (!(token instanceof ArrayToken)) {
105                                throw new IllegalActionException("Read non-array token: "
106                                                + token);
107                        }
108
109                        final ArrayToken arrayToken = (ArrayToken) token;
110
111                        if (!(arrayToken.getElementType() instanceof RecordType)) {
112                                throw new IllegalActionException(
113                                                "Read non-record of array token: " + token);
114                        }
115
116                        final int length = arrayToken.length();
117                        for (int i = 0; i < length; i++) {
118                                final RecordToken element = (RecordToken) arrayToken
119                                                .getElement(i);
120                                final Object key = convertToHadoopData(element.get("key"));
121                                final Object value = convertToHadoopData(element.get("value"));
122                                outputList.add(new KeyValuePair(key, value));
123                        }
124                }
125        }
126
127        // private static final Log LOG = LogFactory.getLog(StubUtilities.class);
128
129        /** Convert a token to hadoop data. */
130        public static Object convertToHadoopData(Token token)
131                        throws IllegalActionException {
132
133                if (token == null)
134                        return null;
135
136                final Type type = token.getType();
137                //LOG.debug("type of token '" + token + "' is: " + type);
138                Object retval = null;
139
140                if (type == BaseType.STRING) {
141                        retval = new Text(((StringToken) token).stringValue());
142                } else if (type == BaseType.INT) {
143                        retval = new IntWritable(((IntToken) token).intValue());
144                } else if (type == BaseType.DOUBLE) {
145                        retval = new DoubleWritable(((DoubleToken) token).doubleValue());
146                } else if (type == BaseType.LONG) {
147                        retval = new LongWritable(((LongToken) token).longValue());
148                } else if (type == BaseType.NIL) {
149                        retval = NullWritable.get();
150                } else if (type == BaseType.OBJECT) {
151                        retval = new ObjectWritable(((ObjectToken) token).getValue());
152                } else if (type instanceof ArrayType) {
153                        if (((ArrayType) type).getElementType() == BaseType.UNSIGNED_BYTE)
154                                retval = new BytesWritable(
155                                                arrayTokenToByteArray((ArrayToken) token));
156                } else if (type instanceof RecordType) {
157                    retval = new RecordTokenWritable(token.toString());
158                } else if (type instanceof ObjectType) {
159                        final Class<?> clazz = ((ObjectType) type).getValueClass();
160                        if (clazz == byte[].class)
161                                retval = new BytesWritable(
162                                                (byte[]) ((ObjectToken) token).getValue());
163                }
164
165                if (retval == null) {
166                        throw new IllegalActionException("Unsupported token type: " + type);
167                }
168                //LOG.debug("convert kepler data " + token + " into hadoop data: "
169                                //+ retval);
170                return retval;
171        }
172
173        /** Convert a token type to hadoop data type. */
174        public static Class<?> convertToHadoopType(Type type)
175                        throws IllegalActionException {
176
177                //LOG.debug("type is: " + type);
178                Class<?> retval = null;
179
180                if (type == BaseType.STRING) {
181                        retval = Text.class;
182                } else if (type == BaseType.INT) {
183                        retval = IntWritable.class;
184                } else if (type == BaseType.DOUBLE) {
185                        retval = DoubleWritable.class;
186                } else if (type == BaseType.LONG) {
187                        retval = LongWritable.class;
188                } else if (type == BaseType.NIL) {
189                        retval = NullWritable.class;
190                } else if (type == BaseType.OBJECT) {
191                        retval = ObjectWritable.class;
192                } else if (type instanceof ArrayType) {
193                        if (((ArrayType) type).getElementType() == BaseType.UNSIGNED_BYTE)
194                                retval = BytesWritable.class;
195                        else
196                                return convertToHadoopType(((ArrayType) type).getElementType());
197                } else if (type instanceof RecordType) {
198                    retval = RecordTokenWritable.class;
199                } else if (type instanceof ObjectType) {
200                        final Class<?> clazz = ((ObjectType) type).getValueClass();
201                        if (clazz == byte[].class)
202                                retval = BytesWritable.class;
203                }
204                if (retval == null) {
205                        throw new IllegalActionException("Unsupported token type: " + type);
206                }
207
208                //LOG.debug("output of convertToHadoopType() is: " + retval);
209                return retval;
210        }
211
212        /** Convert a token type (string type) to hadoop data type. */
213        public static Class<?> convertToHadoopType(String typeString)
214                        throws IllegalActionException {
215
216                Type type = BaseType.forName(typeString);
217                return convertToHadoopType(type);
218        }
219
220        /** Convert a token type to hadoop data type. */
221        public static Class<?> convertToTaggedType(Type type)
222                        throws IllegalActionException {
223
224                //LOG.debug("type in convertToTaggedType() is: " + type);
225                Class<?> retval = null;
226
227                if (type == BaseType.STRING) {
228                        retval = TaggedText.class;
229                } else if (type == BaseType.INT) {
230                        retval = TaggedIntWritable.class;
231                } else if (type == BaseType.DOUBLE) {
232                        retval = TaggedDoubleWritable.class;
233                } else if (type == BaseType.LONG) {
234                        retval = TaggedLongWritable.class;
235                } else if (type == BaseType.NIL) {
236                        retval = TaggedNullWritable.class;
237                } else if (type == BaseType.OBJECT) {
238                        retval = TaggedObjectWritable.class;
239                } else if (type instanceof ArrayType) {
240                        // get element type for arrayType
241                        return convertToTaggedType(((ArrayType) type).getElementType());
242                } else if (type instanceof ObjectType) {
243                        final Class<?> clazz = ((ObjectType) type).getValueClass();
244                        if (clazz == byte[].class)
245                                retval = TaggedBytesWritable.class;
246                }
247                if (retval == null) {
248                        throw new IllegalActionException("Unsupported token type: " + type);
249                }
250
251                //LOG.debug("output of convertToTaggedType() is: " + retval);
252                return retval;
253        }
254
255        /** Convert a token record array type to hadoop data type. */
256        public static Class<?> convertRecordArrayToHadoopType(ArrayType type,
257                        String name) throws IllegalActionException {
258
259                //LOG.debug("record array type is: " + type);
260                Type elementType = ((RecordType) type.getElementType()).get(name);
261                return convertToHadoopType(elementType);
262        }
263
264        /** Convert a token record type to hadoop data type. */
265        public static Class<?> convertRecordToHadoopType(RecordType type, String name)
266                        throws IllegalActionException {
267
268                //LOG.debug("record type is: " + type);
269                return convertToHadoopType(type.get(name));
270        }
271
272        /** Convert hadoop data to a token. */
273        public static Token convertToToken(Object value) throws IllegalActionException {
274                //LOG.debug("class of value '" + value + "' is: " + value.getClass());
275                Token retval = null;
276                if (value instanceof TokenWritable) {
277                    return ((TokenWritable)value).getToken();
278                } else if (value instanceof NullWritable) {
279                        retval = Token.NIL;
280                } else if (value instanceof RecordTokenWritable) {
281                    retval = new RecordToken(((RecordTokenWritable)value).toString());
282                } else if (value instanceof Text) {
283                        retval = new StringToken(((Text) value).toString());
284                } else if (value instanceof String) {
285                        retval = new StringToken(((String) value));
286                } else if (value instanceof IntWritable) {
287                        retval = new IntToken(((IntWritable) value).get());
288                } else if (value instanceof LongWritable) {
289                        retval = new LongToken(((LongWritable) value).get());
290                } else if (value instanceof DoubleWritable) {
291                        retval = new DoubleToken(((DoubleWritable) value).get());
292                } else if (value instanceof ObjectWritable) {
293                        try {
294                                retval = new ObjectToken(((ObjectWritable) value).get());
295                        } catch (IllegalActionException e) {
296
297                                e.printStackTrace();
298                                throw new RuntimeException("Error creating ObjectToken: "
299                                                + e.getMessage());
300                        }
301                } else if (value instanceof BytesWritable) {
302                        try {
303                                retval = new ObjectToken(((BytesWritable) value).copyBytes(),
304                                                byte[].class);
305                        } catch (IllegalActionException e) {
306                                e.printStackTrace();
307                                throw new RuntimeException("Error creating ObjectToken: "
308                                                + e.getMessage());
309                        }
310                } else {
311                        throw new RuntimeException("Unsupported type of Hadoop data: "
312                                        + value.getClass());
313                }
314                //LOG.debug("convert hadoop data " + value + " into kepler data: "
315                                //+ retval);
316                return retval;
317        }
318
319        /** Convert hadoop data to a token. */
320        public static Token convertToToken(Object value, Class<?> clazz) {
321                //LOG.debug("class of value '" + value + "' is: " + value.getClass());
322                Token retval = null;
323                if (clazz == NullWritable.class) {
324                        retval = Token.NIL;
325                } else if (clazz == Text.class) {
326                        retval = new StringToken(((Text) value).toString());
327                } else if (clazz == String.class) {
328                        retval = new StringToken(((String) value));
329                } else if (clazz == IntWritable.class) {
330                        retval = new IntToken(((IntWritable) value).get());
331                } else if (clazz == LongWritable.class) {
332                        retval = new LongToken(((LongWritable) value).get());
333                } else if (clazz == DoubleWritable.class) {
334                        retval = new DoubleToken(((DoubleWritable) value).get());
335                } else if (clazz == ObjectWritable.class) {
336                        try {
337                                retval = new ObjectToken(((ObjectWritable) value).get());
338                        } catch (IllegalActionException e) {
339                                e.printStackTrace();
340                                throw new RuntimeException("Error creating ObjectToken: "
341                                                + e.getMessage());
342                        }
343                } else if (clazz == BytesWritable.class) {
344                        try {
345                                retval = new ObjectToken(((BytesWritable) value).copyBytes(),
346                                                byte[].class);
347                        } catch (IllegalActionException e) {
348                                e.printStackTrace();
349                                throw new RuntimeException("Error creating ObjectToken: "
350                                                + e.getMessage());
351                        }
352                } else {
353                        throw new RuntimeException("Unsupported type of Hadoop data: "
354                                        + value.getClass());
355                }
356                return retval;
357        }
358
359        /** Convert data to a token. */
360        public static Token convertToArrayToken(Iterator<Writable> values) {
361
362                final List<Token> tokenList = new ArrayList<Token>();
363                try {
364                while (values.hasNext()) {
365                    tokenList.add(convertToToken(values.next()));
366                }
367                        if (tokenList.size() == 0) {
368                                return new ArrayToken(BaseType.NIL);
369                        } else
370                                return new ArrayToken(tokenList.toArray(new Token[tokenList
371                                                .size()]));
372                } catch (IllegalActionException e) {
373                        throw new RuntimeException("Error creating array token.", e);
374                }
375        }
376
377        /**
378         * Convert ArrayToken to an array of byte.
379         * 
380         * @throws IllegalActionException
381         */
382        public static byte[] arrayTokenToByteArray(ArrayToken arrayToken)
383                        throws IllegalActionException {
384                if (arrayToken.getElementType() != BaseType.UNSIGNED_BYTE)
385                        return null;
386                byte[] byteTokenArray = new byte[arrayToken.length()];
387                for (int i = 0; i < arrayToken.length(); i++) {
388                        byteTokenArray[i] = ((UnsignedByteToken) arrayToken.getElement(i))
389                                        .byteValue();
390                }
391                return byteTokenArray;
392        }
393        
394    /** Initialize the parts of Kepler required to run the sub-workflow in
395     *  the stub.
396     */
397    public synchronized static void initializeKepler(Configuration conf) {
398        
399        // see if we have already initialized once in this JVM.
400        if(!_haveInitializedOnce.get()) {
401                
402                // see if the kepler modules directory exists.
403                String modulesDirStr = conf.get(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, null);
404                if(modulesDirStr != null) {
405                    File modulesDir = new File(modulesDirStr);
406                    if(modulesDir.exists() && modulesDir.isDirectory()) {
407                
408                        // set the directory
409                        ProjectLocator.setKeplerModulesDir(modulesDir);
410                        
411                    // FIXME this is a bad hack
412                    // ProjectLocator.userBuildDir has to be set, otherwise findKeplerModulesDir()
413                    // is called, which fails when running in the stratosphere server.
414                    // userBuildDir is a private field, so we have to use reflection to modify it.
415                    // unfortunately, we cannot add a method in ProjectLocator since kepler-tasks
416                    // cannot be patched!
417                    //
418                    // this sets userBuildDir to $HOME/KeplerData/kepler.modules/build-area
419                    //
420                    if(ProjectLocator.shouldUtilizeUserKeplerModules()) {
421                        try {
422                            Field field = ProjectLocator.class.getDeclaredField("userBuildDir");
423                            field.setAccessible(true);
424                            File file = new File(System.getProperty("user.home") + File.separator +
425                                    "KeplerData" + File.separator +
426                                    "kepler.modules" + File.separator +
427                                    "build-area");
428                            field.set(null, file);
429                        } catch (Throwable e) {
430                            throw new RuntimeException("Error setting user build dir in ProjectLocator.", e);
431                        }
432                    }
433
434                        try {
435                            Kepler.setJavaPropertiesAndCopyModuleDirectories();
436                        } catch (Exception e) {
437                            throw new RuntimeException("Error setting java properties and copying module directories.", e);
438                        }   
439                    }
440                }
441                
442                _haveInitializedOnce.set(true);
443        }        
444    }
445
446        private static final Log LOG = LogFactory.getLog(StubUtilities.class);
447        
448    /** If true, we have initialized Kepler for this process. */
449    private static AtomicBoolean _haveInitializedOnce = new AtomicBoolean(false);
450
451}