001/* Utilities for Kepler Spark stubs. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-09-29 19:11:04 +0000 (Tue, 29 Sep 2015) $' 008 * '$Revision: 34002 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.spark.stub; 031 032import java.io.ByteArrayInputStream; 033import java.io.DataInputStream; 034import java.io.File; 035import java.io.IOException; 036import java.util.Date; 037import java.util.List; 038import java.util.concurrent.atomic.AtomicBoolean; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.io.BytesWritable; 044import org.apache.hadoop.io.DoubleWritable; 045import org.apache.hadoop.io.IntWritable; 046import org.apache.hadoop.io.LongWritable; 047import org.apache.hadoop.io.NullWritable; 048import org.apache.hadoop.io.ObjectWritable; 049import org.apache.hadoop.io.Text; 050import org.kepler.Kepler; 051import org.kepler.build.project.ProjectLocator; 052import org.kepler.ddp.Utilities; 053import org.kepler.spark.type.RecordValue; 054 055import ptolemy.data.ArrayToken; 056import ptolemy.data.DateToken; 057import ptolemy.data.DoubleToken; 058import ptolemy.data.IntToken; 059import ptolemy.data.LongToken; 060import ptolemy.data.ObjectToken; 061import ptolemy.data.RecordToken; 062import ptolemy.data.StringToken; 063import ptolemy.data.Token; 064import ptolemy.data.UnsignedByteToken; 065import ptolemy.data.type.ArrayType; 066import ptolemy.data.type.BaseType; 067import ptolemy.data.type.ObjectType; 068import ptolemy.data.type.RecordType; 069import ptolemy.data.type.Type; 070import ptolemy.kernel.util.IllegalActionException; 071import scala.Tuple2; 072 073/** A collection of utilities used by Kepler Spark stubs. 074 * 075 * @author Daniel Crawl 076 * @version $Id: StubUtilities.java 34002 2015-09-29 19:11:04Z crawl $ 077 */ 078public class StubUtilities { 079 080 /** Convert from a (possible) Hadoop Writable to an object. */ 081 public static Object convertToObject(Object value) { 082 if(value instanceof Text) { 083 return ((Text)value).toString(); 084 } else if(value instanceof LongWritable) { 085 return Long.valueOf(((LongWritable)value).get()); 086 } else { 087 return value; 088 } 089 } 090 091 /** Convert Hadoop data types or Java Objects to a Ptolemy token. */ 092 public static Token convertToToken(Object value) throws IllegalActionException { 093 094 //LOG.debug("class of value '" + value + "' is: " + value.getClass()); 095 Token retval = null; 096 if (value instanceof NullWritable) { 097 retval = Token.NIL; 098 } else if (value instanceof Text) { 099 retval = new StringToken(((Text) value).toString()); 100 } else if (value instanceof String) { 101 retval = new StringToken(((String) value)); 102 } else if (value instanceof IntWritable) { 103 retval = new IntToken(((IntWritable) value).get()); 104 } else if (value instanceof Integer) { 105 retval = new IntToken(((Integer)value).intValue()); 106 } else if (value instanceof LongWritable) { 107 retval = new LongToken(((LongWritable) value).get()); 108 } else if (value instanceof Long) { 109 retval = new LongToken(((Long)value).longValue()); 110 } else if (value instanceof DoubleWritable) { 111 retval = new DoubleToken(((DoubleWritable) value).get()); 112 } else if (value instanceof Double) { 113 retval = new DoubleToken(((Double)value).doubleValue()); 114 } else if (value instanceof RecordValue) { 115 retval = ((RecordValue)value).recordToken(); 116 } else if (value instanceof ObjectWritable) { 117 try { 118 retval = new ObjectToken(((ObjectWritable) value).get()); 119 } catch (IllegalActionException e) { 120 121 e.printStackTrace(); 122 throw new RuntimeException("Error creating ObjectToken: " 123 + e.getMessage()); 124 } 125 } else if (value instanceof BytesWritable) { 126 try { 127 retval = new ObjectToken(((BytesWritable) value).copyBytes(), 128 byte[].class); 129 } catch (IllegalActionException e) { 130 e.printStackTrace(); 131 throw new RuntimeException("Error creating ObjectToken: " 132 + e.getMessage()); 133 } 134 } else if (value instanceof Date) { 135 return new DateToken(((Date)value).getTime()); 136 } else if (value instanceof String[]) { 137 String[] arrayValue = (String[]) value; 138 Token[] array = new Token[arrayValue.length]; 139 for(int i = 0; i < arrayValue.length; i++) { 140 array[i] = new StringToken(arrayValue[i]); 141 } 142 retval = new ArrayToken(array); 143 } else if (value instanceof Integer[]) { 144 Integer[] arrayValue = (Integer[]) value; 145 Token[] array = new Token[arrayValue.length]; 146 for(int i = 0; i < arrayValue.length; i++) { 147 array[i] = new IntToken(arrayValue[i].intValue()); 148 } 149 retval = new ArrayToken(array); 150 } else if (value instanceof Long[]) { 151 Long[] arrayValue = (Long[]) value; 152 Token[] array = new Token[arrayValue.length]; 153 for(int i = 0; i < arrayValue.length; i++) { 154 array[i] = new LongToken(arrayValue[i].longValue()); 155 } 156 retval = new ArrayToken(array); 157 } else if (value instanceof Double[]) { 158 Double[] arrayValue = (Double[]) value; 159 Token[] array = new Token[arrayValue.length]; 160 for(int i = 0; i < arrayValue.length; i++) { 161 array[i] = new DoubleToken(arrayValue[i].doubleValue()); 162 } 163 retval = new ArrayToken(array); 164 } else if (value instanceof RecordValue[]) { 165 RecordValue[] arrayValue = (RecordValue[]) value; 166 Token[] array = new Token[arrayValue.length]; 167 for(int i = 0; i < arrayValue.length; i++) { 168 array[i] = arrayValue[i].recordToken(); 169 } 170 retval = new ArrayToken(array); 171 } else { 172 throw new RuntimeException("Unsupported type of Hadoop data: " 173 + value.getClass()); 174 } 175 //LOG.debug("convert hadoop data " + value + " into kepler data: " 176 //+ retval); 177 return retval; 178 } 179 180 /** Convert a list of record tokens into a list of Tuple2s containing Java Objects. */ 181 public static void convertTokenToTupleList(List<Token> tokenList, List<Tuple2<Object, Object>> out) 182 throws IllegalActionException { 183 184 if (tokenList == null) { 185 return; 186 } 187 188 for (Token token: tokenList) { 189 if(!(token instanceof ArrayToken)) { 190 throw new IllegalActionException("Read non-array token: " + token); 191 } 192 193 final ArrayToken arrayToken = (ArrayToken)token; 194 195 if(!(arrayToken.getElementType() instanceof RecordType)) { 196 throw new IllegalActionException("Read non-record of array token: " + token); 197 } 198 199 final int length = arrayToken.length(); 200 for(int i = 0; i < length; i++) { 201 final RecordToken element = (RecordToken) arrayToken.getElement(i); 202 final Object key = convertTokenToObject(element.get("key")); 203 final Object value = convertTokenToObject(element.get("value")); 204 out.add(new Tuple2<Object,Object>(key, value)); 205 } 206 } 207 } 208 209 /** Convert lists of keys and values to a list of Tuple2s. */ 210 public static void convertListsToTupleList(List<?> keys, List<?> values, List<Tuple2<Object, Object>> tuples) { 211 for(int i = 0; i < keys.size(); i++) { 212 tuples.add(new Tuple2<Object, Object>(keys.get(i), values.get(i))); 213 } 214 } 215 216 /** Convert a token to a Java Object. */ 217 public static Object convertTokenToObject(Token token) throws IllegalActionException { 218 219 final Type type = token.getType(); 220 221 Object retval = null; 222 if(type instanceof ObjectType) { 223 224 final Class<?> clazz = ((ObjectType)type).getValueClass(); 225 if(clazz == byte[].class) { 226 retval = (byte[])((ObjectToken)token).getValue(); 227 } else { 228 throw new IllegalActionException("Conversion of Object Token with class " + 229 clazz + " is unsupported."); 230 } 231 } else if(type == BaseType.STRING) { 232 retval = ((StringToken)token).stringValue(); 233 } else if(type == BaseType.INT) { 234 retval = Integer.valueOf(((IntToken)token).intValue()); 235 } else if(type == BaseType.LONG) { 236 retval = Long.valueOf(((LongToken)token).longValue()); 237 } else if(type == BaseType.DOUBLE) { 238 retval = Double.valueOf(((DoubleToken)token).doubleValue()); 239 } else if(type == BaseType.NIL) { 240 retval = NullWritable.get(); 241 } else if(type == BaseType.DATE) { 242 retval = new Date(((DateToken)token).getValue()); 243 } else if(type instanceof ArrayType) { 244 retval = convertArrayTokenToObject((ArrayToken)token); 245 } else if(type instanceof RecordType) { 246 retval = new RecordValue((RecordToken)token); 247 } 248 if (retval == null){ 249 throw new IllegalActionException("Unsupported token type: " + type); 250 } 251 252 return retval; 253 } 254 255 /** Convert ArrayToken to an array of objects. */ 256 public static Object convertArrayTokenToObject(ArrayToken arrayToken) 257 throws IllegalActionException{ 258 259 final Type type = arrayToken.getElementType(); 260 final int length = arrayToken.length(); 261 262 if(type instanceof ObjectType) { 263 264 /* 265 final Class<?> clazz = ((ObjectType)type).getValueClass(); 266 if(clazz == byte[].class) { 267 Object[] retval = new Object[length]; 268 // TODO 269 return retval; 270 } else { 271 throw new IllegalActionException("Conversion of Object Token with class " + 272 clazz + " is unsupported."); 273 } 274 */ 275 } else if(type == BaseType.STRING) { 276 String[] retval = new String[length]; 277 for (int i = 0; i < length; i++) { 278 retval[i] = ((StringToken)arrayToken.getElement(i)).stringValue(); 279 } 280 return retval; 281 } else if(type == BaseType.INT) { 282 Integer[] retval = new Integer[length]; 283 for (int i = 0; i < length; i++) { 284 retval[i] = ((IntToken)arrayToken.getElement(i)).intValue(); 285 } 286 return retval; 287 } else if(type == BaseType.LONG) { 288 Long[] retval = new Long[length]; 289 for (int i = 0; i < length; i++) { 290 retval[i] = ((LongToken)arrayToken.getElement(i)).longValue(); 291 } 292 return retval; 293 } else if(type == BaseType.DOUBLE) { 294 Double[] retval = new Double[length]; 295 for (int i = 0; i < length; i++) { 296 retval[i] = ((DoubleToken)arrayToken.getElement(i)).doubleValue(); 297 } 298 return retval; 299 //} else if(type == BaseType.NIL) { 300 // TODO 301 //retval = NullWritable.get(); 302 } else if(type == BaseType.UNSIGNED_BYTE) { 303 byte[] retval = new byte[length]; 304 for (int i = 0; i < length; i++) { 305 retval[i] = ((UnsignedByteToken)arrayToken.getElement(i)).byteValue(); 306 } 307 return retval; 308 } else if(type == BaseType.DATE) { 309 Date[] retval = new Date[length]; 310 for (int i = 0; i < length; i++) { 311 retval[i] = new Date(((DateToken)arrayToken.getElement(i)).getValue()); 312 } 313 return retval; 314 //} else if(type instanceof ArrayType) { 315 // TODO 316 //retval = convertArrayTokenToObject((ArrayToken)token); 317 } else if(type instanceof RecordType) { 318 RecordValue[] retval = new RecordValue[length]; 319 for (int i = 0; i < length; i++) { 320 retval[i] = new RecordValue((RecordToken)arrayToken.getElement(i)); 321 } 322 return retval; 323 } 324 325 throw new IllegalActionException("Unsupported array token type: " + type); 326 } 327 328 /** Initialize the parts of Kepler required to run the sub-workflow in 329 * the stub. 330 */ 331 public synchronized static void initializeKepler(Configuration parameters) { 332 333 // see if we have already initialized once in this JVM. 334 if(!_haveInitializedOnce.get()) { 335 336 // see if the kepler modules directory exists. 337 String modulesDirStr = parameters.get(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, null); 338 if(modulesDirStr != null) { 339 File modulesDir = new File(modulesDirStr); 340 if(modulesDir.exists() && modulesDir.isDirectory()) { 341 342 // set the directory 343 ProjectLocator.setKeplerModulesDir(modulesDir); 344 345 if(ProjectLocator.shouldUtilizeUserKeplerModules()) { 346 File file = new File(System.getProperty("user.home") + File.separator + 347 "KeplerData" + File.separator + 348 "kepler.modules" + File.separator + 349 "build-area"); 350 ProjectLocator.setUserBuildDir(file); 351 } 352 353 try { 354 Kepler.setJavaPropertiesAndCopyModuleDirectories(); 355 } catch (Exception e) { 356 throw new RuntimeException("Error setting java properties and copying module directories.", e); 357 } 358 } 359 } 360 361 _haveInitializedOnce.set(true); 362 } 363 } 364 365 /** Initialize Ptolemy. */ 366 public static void initializePtolemy() { 367 368 // Ptolemy throws an exception if ptolemy.ptII.dir is not set. 369 try { 370 System.setProperty("ptolemy.ptII.dir", "foooo"); 371 } catch(Throwable t) { 372 LOG.error("can't set property.", t); 373 } 374 } 375 376 public static Configuration deserializeConfiguration(byte[] bytes) throws IOException { 377 378 ByteArrayInputStream arrayStream = null; 379 DataInputStream inputStream = null; 380 try { 381 arrayStream = new ByteArrayInputStream(bytes); 382 inputStream = new DataInputStream(arrayStream); 383 Configuration configuration = new Configuration(); 384 configuration.readFields(inputStream); 385 return configuration; 386 } finally { 387 if(inputStream != null) { 388 inputStream.close(); 389 } 390 if(arrayStream != null) { 391 arrayStream.close(); 392 } 393 } 394 } 395 396 /////////////////////////////////////////////////////////////////// 397 //// private methods //// 398 399 /////////////////////////////////////////////////////////////////// 400 //// private fields //// 401 402 private static final Log LOG = LogFactory.getLog(StubUtilities.class); 403 404 /** If true, we have initialized Kepler for this process. */ 405 private static AtomicBoolean _haveInitializedOnce = new AtomicBoolean(false); 406 407}