001/* 002 * Copyright (c) 2012-2013 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2014-11-12 23:21:09 +0000 (Wed, 12 Nov 2014) $' 007 * '$Revision: 33070 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029package org.kepler.hadoop.util; 030 031import java.io.File; 032import java.lang.reflect.Field; 033import java.util.ArrayList; 034import java.util.Iterator; 035import java.util.List; 036import java.util.concurrent.atomic.AtomicBoolean; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.io.BytesWritable; 042import org.apache.hadoop.io.DoubleWritable; 043import org.apache.hadoop.io.IntWritable; 044import org.apache.hadoop.io.LongWritable; 045import org.apache.hadoop.io.NullWritable; 046import org.apache.hadoop.io.ObjectWritable; 047import org.apache.hadoop.io.Text; 048import org.apache.hadoop.io.Writable; 049import org.kepler.Kepler; 050import org.kepler.build.project.ProjectLocator; 051import org.kepler.ddp.Utilities; 052import org.kepler.hadoop.io.KeyValuePair; 053import org.kepler.hadoop.io.RecordTokenWritable; 054import org.kepler.hadoop.io.TaggedBytesWritable; 055import org.kepler.hadoop.io.TaggedDoubleWritable; 056import org.kepler.hadoop.io.TaggedIntWritable; 057import org.kepler.hadoop.io.TaggedLongWritable; 058import org.kepler.hadoop.io.TaggedNullWritable; 059import org.kepler.hadoop.io.TaggedObjectWritable; 060import org.kepler.hadoop.io.TaggedText; 061import org.kepler.hadoop.io.TokenWritable; 062 063import ptolemy.data.ArrayToken; 064import ptolemy.data.DoubleToken; 065import ptolemy.data.IntToken; 066import ptolemy.data.LongToken; 067import ptolemy.data.ObjectToken; 068import ptolemy.data.RecordToken; 069import ptolemy.data.StringToken; 070import ptolemy.data.Token; 071import ptolemy.data.UnsignedByteToken; 072import ptolemy.data.type.ArrayType; 073import ptolemy.data.type.BaseType; 074import ptolemy.data.type.ObjectType; 075import ptolemy.data.type.RecordType; 076import ptolemy.data.type.Type; 077import ptolemy.kernel.util.IllegalActionException; 078 079/** 080 * A collection of utilities used by Kepler hadoop module. It is built based on 081 * org.kepler.stratosphere.stub.StubUtilities. 082 * 083 * @author Jianwu Wang 084 * @version $Id: StubUtilities.java 33070 2014-11-12 23:21:09Z crawl $ 085 */ 086 087public class StubUtilities { 088 089 /** Convert a token to hadoop data and add to a Collector. */ 090 public static void convertTokenToList(List<Token> tokenList, 091 List<KeyValuePair> outputList) throws IllegalActionException { 092 093 if (tokenList == null) { 094 //LOG.debug("token is null before converting into list."); 095 return; 096 } 097 for (Token token : tokenList) { 098 099 //LOG.debug("token type before converting into list:" 100 //+ token.getType()); 101 //LOG.debug("token value before converting into list:" 102 //+ token.toString()); 103 104 if (!(token instanceof ArrayToken)) { 105 throw new IllegalActionException("Read non-array token: " 106 + token); 107 } 108 109 final ArrayToken arrayToken = (ArrayToken) token; 110 111 if (!(arrayToken.getElementType() instanceof RecordType)) { 112 throw new IllegalActionException( 113 "Read non-record of array token: " + token); 114 } 115 116 final int length = arrayToken.length(); 117 for (int i = 0; i < length; i++) { 118 final RecordToken element = (RecordToken) arrayToken 119 .getElement(i); 120 final Object key = convertToHadoopData(element.get("key")); 121 final Object value = convertToHadoopData(element.get("value")); 122 outputList.add(new KeyValuePair(key, value)); 123 } 124 } 125 } 126 127 // private static final Log LOG = LogFactory.getLog(StubUtilities.class); 128 129 /** Convert a token to hadoop data. */ 130 public static Object convertToHadoopData(Token token) 131 throws IllegalActionException { 132 133 if (token == null) 134 return null; 135 136 final Type type = token.getType(); 137 //LOG.debug("type of token '" + token + "' is: " + type); 138 Object retval = null; 139 140 if (type == BaseType.STRING) { 141 retval = new Text(((StringToken) token).stringValue()); 142 } else if (type == BaseType.INT) { 143 retval = new IntWritable(((IntToken) token).intValue()); 144 } else if (type == BaseType.DOUBLE) { 145 retval = new DoubleWritable(((DoubleToken) token).doubleValue()); 146 } else if (type == BaseType.LONG) { 147 retval = new LongWritable(((LongToken) token).longValue()); 148 } else if (type == BaseType.NIL) { 149 retval = NullWritable.get(); 150 } else if (type == BaseType.OBJECT) { 151 retval = new ObjectWritable(((ObjectToken) token).getValue()); 152 } else if (type instanceof ArrayType) { 153 if (((ArrayType) type).getElementType() == BaseType.UNSIGNED_BYTE) 154 retval = new BytesWritable( 155 arrayTokenToByteArray((ArrayToken) token)); 156 } else if (type instanceof RecordType) { 157 retval = new RecordTokenWritable(token.toString()); 158 } else if (type instanceof ObjectType) { 159 final Class<?> clazz = ((ObjectType) type).getValueClass(); 160 if (clazz == byte[].class) 161 retval = new BytesWritable( 162 (byte[]) ((ObjectToken) token).getValue()); 163 } 164 165 if (retval == null) { 166 throw new IllegalActionException("Unsupported token type: " + type); 167 } 168 //LOG.debug("convert kepler data " + token + " into hadoop data: " 169 //+ retval); 170 return retval; 171 } 172 173 /** Convert a token type to hadoop data type. */ 174 public static Class<?> convertToHadoopType(Type type) 175 throws IllegalActionException { 176 177 //LOG.debug("type is: " + type); 178 Class<?> retval = null; 179 180 if (type == BaseType.STRING) { 181 retval = Text.class; 182 } else if (type == BaseType.INT) { 183 retval = IntWritable.class; 184 } else if (type == BaseType.DOUBLE) { 185 retval = DoubleWritable.class; 186 } else if (type == BaseType.LONG) { 187 retval = LongWritable.class; 188 } else if (type == BaseType.NIL) { 189 retval = NullWritable.class; 190 } else if (type == BaseType.OBJECT) { 191 retval = ObjectWritable.class; 192 } else if (type instanceof ArrayType) { 193 if (((ArrayType) type).getElementType() == BaseType.UNSIGNED_BYTE) 194 retval = BytesWritable.class; 195 else 196 return convertToHadoopType(((ArrayType) type).getElementType()); 197 } else if (type instanceof RecordType) { 198 retval = RecordTokenWritable.class; 199 } else if (type instanceof ObjectType) { 200 final Class<?> clazz = ((ObjectType) type).getValueClass(); 201 if (clazz == byte[].class) 202 retval = BytesWritable.class; 203 } 204 if (retval == null) { 205 throw new IllegalActionException("Unsupported token type: " + type); 206 } 207 208 //LOG.debug("output of convertToHadoopType() is: " + retval); 209 return retval; 210 } 211 212 /** Convert a token type (string type) to hadoop data type. */ 213 public static Class<?> convertToHadoopType(String typeString) 214 throws IllegalActionException { 215 216 Type type = BaseType.forName(typeString); 217 return convertToHadoopType(type); 218 } 219 220 /** Convert a token type to hadoop data type. */ 221 public static Class<?> convertToTaggedType(Type type) 222 throws IllegalActionException { 223 224 //LOG.debug("type in convertToTaggedType() is: " + type); 225 Class<?> retval = null; 226 227 if (type == BaseType.STRING) { 228 retval = TaggedText.class; 229 } else if (type == BaseType.INT) { 230 retval = TaggedIntWritable.class; 231 } else if (type == BaseType.DOUBLE) { 232 retval = TaggedDoubleWritable.class; 233 } else if (type == BaseType.LONG) { 234 retval = TaggedLongWritable.class; 235 } else if (type == BaseType.NIL) { 236 retval = TaggedNullWritable.class; 237 } else if (type == BaseType.OBJECT) { 238 retval = TaggedObjectWritable.class; 239 } else if (type instanceof ArrayType) { 240 // get element type for arrayType 241 return convertToTaggedType(((ArrayType) type).getElementType()); 242 } else if (type instanceof ObjectType) { 243 final Class<?> clazz = ((ObjectType) type).getValueClass(); 244 if (clazz == byte[].class) 245 retval = TaggedBytesWritable.class; 246 } 247 if (retval == null) { 248 throw new IllegalActionException("Unsupported token type: " + type); 249 } 250 251 //LOG.debug("output of convertToTaggedType() is: " + retval); 252 return retval; 253 } 254 255 /** Convert a token record array type to hadoop data type. */ 256 public static Class<?> convertRecordArrayToHadoopType(ArrayType type, 257 String name) throws IllegalActionException { 258 259 //LOG.debug("record array type is: " + type); 260 Type elementType = ((RecordType) type.getElementType()).get(name); 261 return convertToHadoopType(elementType); 262 } 263 264 /** Convert a token record type to hadoop data type. */ 265 public static Class<?> convertRecordToHadoopType(RecordType type, String name) 266 throws IllegalActionException { 267 268 //LOG.debug("record type is: " + type); 269 return convertToHadoopType(type.get(name)); 270 } 271 272 /** Convert hadoop data to a token. */ 273 public static Token convertToToken(Object value) throws IllegalActionException { 274 //LOG.debug("class of value '" + value + "' is: " + value.getClass()); 275 Token retval = null; 276 if (value instanceof TokenWritable) { 277 return ((TokenWritable)value).getToken(); 278 } else if (value instanceof NullWritable) { 279 retval = Token.NIL; 280 } else if (value instanceof RecordTokenWritable) { 281 retval = new RecordToken(((RecordTokenWritable)value).toString()); 282 } else if (value instanceof Text) { 283 retval = new StringToken(((Text) value).toString()); 284 } else if (value instanceof String) { 285 retval = new StringToken(((String) value)); 286 } else if (value instanceof IntWritable) { 287 retval = new IntToken(((IntWritable) value).get()); 288 } else if (value instanceof LongWritable) { 289 retval = new LongToken(((LongWritable) value).get()); 290 } else if (value instanceof DoubleWritable) { 291 retval = new DoubleToken(((DoubleWritable) value).get()); 292 } else if (value instanceof ObjectWritable) { 293 try { 294 retval = new ObjectToken(((ObjectWritable) value).get()); 295 } catch (IllegalActionException e) { 296 297 e.printStackTrace(); 298 throw new RuntimeException("Error creating ObjectToken: " 299 + e.getMessage()); 300 } 301 } else if (value instanceof BytesWritable) { 302 try { 303 retval = new ObjectToken(((BytesWritable) value).copyBytes(), 304 byte[].class); 305 } catch (IllegalActionException e) { 306 e.printStackTrace(); 307 throw new RuntimeException("Error creating ObjectToken: " 308 + e.getMessage()); 309 } 310 } else { 311 throw new RuntimeException("Unsupported type of Hadoop data: " 312 + value.getClass()); 313 } 314 //LOG.debug("convert hadoop data " + value + " into kepler data: " 315 //+ retval); 316 return retval; 317 } 318 319 /** Convert hadoop data to a token. */ 320 public static Token convertToToken(Object value, Class<?> clazz) { 321 //LOG.debug("class of value '" + value + "' is: " + value.getClass()); 322 Token retval = null; 323 if (clazz == NullWritable.class) { 324 retval = Token.NIL; 325 } else if (clazz == Text.class) { 326 retval = new StringToken(((Text) value).toString()); 327 } else if (clazz == String.class) { 328 retval = new StringToken(((String) value)); 329 } else if (clazz == IntWritable.class) { 330 retval = new IntToken(((IntWritable) value).get()); 331 } else if (clazz == LongWritable.class) { 332 retval = new LongToken(((LongWritable) value).get()); 333 } else if (clazz == DoubleWritable.class) { 334 retval = new DoubleToken(((DoubleWritable) value).get()); 335 } else if (clazz == ObjectWritable.class) { 336 try { 337 retval = new ObjectToken(((ObjectWritable) value).get()); 338 } catch (IllegalActionException e) { 339 e.printStackTrace(); 340 throw new RuntimeException("Error creating ObjectToken: " 341 + e.getMessage()); 342 } 343 } else if (clazz == BytesWritable.class) { 344 try { 345 retval = new ObjectToken(((BytesWritable) value).copyBytes(), 346 byte[].class); 347 } catch (IllegalActionException e) { 348 e.printStackTrace(); 349 throw new RuntimeException("Error creating ObjectToken: " 350 + e.getMessage()); 351 } 352 } else { 353 throw new RuntimeException("Unsupported type of Hadoop data: " 354 + value.getClass()); 355 } 356 return retval; 357 } 358 359 /** Convert data to a token. */ 360 public static Token convertToArrayToken(Iterator<Writable> values) { 361 362 final List<Token> tokenList = new ArrayList<Token>(); 363 try { 364 while (values.hasNext()) { 365 tokenList.add(convertToToken(values.next())); 366 } 367 if (tokenList.size() == 0) { 368 return new ArrayToken(BaseType.NIL); 369 } else 370 return new ArrayToken(tokenList.toArray(new Token[tokenList 371 .size()])); 372 } catch (IllegalActionException e) { 373 throw new RuntimeException("Error creating array token.", e); 374 } 375 } 376 377 /** 378 * Convert ArrayToken to an array of byte. 379 * 380 * @throws IllegalActionException 381 */ 382 public static byte[] arrayTokenToByteArray(ArrayToken arrayToken) 383 throws IllegalActionException { 384 if (arrayToken.getElementType() != BaseType.UNSIGNED_BYTE) 385 return null; 386 byte[] byteTokenArray = new byte[arrayToken.length()]; 387 for (int i = 0; i < arrayToken.length(); i++) { 388 byteTokenArray[i] = ((UnsignedByteToken) arrayToken.getElement(i)) 389 .byteValue(); 390 } 391 return byteTokenArray; 392 } 393 394 /** Initialize the parts of Kepler required to run the sub-workflow in 395 * the stub. 396 */ 397 public synchronized static void initializeKepler(Configuration conf) { 398 399 // see if we have already initialized once in this JVM. 400 if(!_haveInitializedOnce.get()) { 401 402 // see if the kepler modules directory exists. 403 String modulesDirStr = conf.get(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, null); 404 if(modulesDirStr != null) { 405 File modulesDir = new File(modulesDirStr); 406 if(modulesDir.exists() && modulesDir.isDirectory()) { 407 408 // set the directory 409 ProjectLocator.setKeplerModulesDir(modulesDir); 410 411 // FIXME this is a bad hack 412 // ProjectLocator.userBuildDir has to be set, otherwise findKeplerModulesDir() 413 // is called, which fails when running in the stratosphere server. 414 // userBuildDir is a private field, so we have to use reflection to modify it. 415 // unfortunately, we cannot add a method in ProjectLocator since kepler-tasks 416 // cannot be patched! 417 // 418 // this sets userBuildDir to $HOME/KeplerData/kepler.modules/build-area 419 // 420 if(ProjectLocator.shouldUtilizeUserKeplerModules()) { 421 try { 422 Field field = ProjectLocator.class.getDeclaredField("userBuildDir"); 423 field.setAccessible(true); 424 File file = new File(System.getProperty("user.home") + File.separator + 425 "KeplerData" + File.separator + 426 "kepler.modules" + File.separator + 427 "build-area"); 428 field.set(null, file); 429 } catch (Throwable e) { 430 throw new RuntimeException("Error setting user build dir in ProjectLocator.", e); 431 } 432 } 433 434 try { 435 Kepler.setJavaPropertiesAndCopyModuleDirectories(); 436 } catch (Exception e) { 437 throw new RuntimeException("Error setting java properties and copying module directories.", e); 438 } 439 } 440 } 441 442 _haveInitializedOnce.set(true); 443 } 444 } 445 446 private static final Log LOG = LogFactory.getLog(StubUtilities.class); 447 448 /** If true, we have initialized Kepler for this process. */ 449 private static AtomicBoolean _haveInitializedOnce = new AtomicBoolean(false); 450 451}