001/* Utilities for Kepler PACT stubs. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.io.File; 033import java.lang.reflect.Field; 034import java.util.ArrayList; 035import java.util.Iterator; 036import java.util.List; 037import java.util.concurrent.atomic.AtomicBoolean; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.kepler.Kepler; 042import org.kepler.build.project.ProjectLocator; 043import org.kepler.ddp.Utilities; 044import org.kepler.stratosphere.type.ByteArray; 045import org.kepler.stratosphere.type.RecordValue; 046 047import eu.stratosphere.configuration.Configuration; 048import eu.stratosphere.types.DoubleValue; 049import eu.stratosphere.types.IntValue; 050import eu.stratosphere.types.Key; 051import eu.stratosphere.types.LongValue; 052import eu.stratosphere.types.NullValue; 053import eu.stratosphere.types.Record; 054import eu.stratosphere.types.StringValue; 055import eu.stratosphere.types.Value; 056import eu.stratosphere.util.Collector; 057import ptolemy.data.ArrayToken; 058import ptolemy.data.DoubleToken; 059import ptolemy.data.IntToken; 060import ptolemy.data.LongToken; 061import ptolemy.data.ObjectToken; 062import ptolemy.data.RecordToken; 063import ptolemy.data.StringToken; 064import ptolemy.data.Token; 065import ptolemy.data.UnsignedByteToken; 066import ptolemy.data.type.ArrayType; 067import ptolemy.data.type.BaseType; 068import ptolemy.data.type.ObjectType; 069import ptolemy.data.type.RecordType; 070import ptolemy.data.type.Type; 071import ptolemy.kernel.util.IllegalActionException; 072 073/** A collection of utilities used by Kepler PACT stubs. 074 * 075 * @author Daniel Crawl 076 * @version $Id: StubUtilities.java 33628 2015-08-24 22:42:20Z crawl $ 077 */ 078 079public class StubUtilities { 080 081 /** Write lists of Java Object keys and values to a Collector. */ 082 public static void convertResultsToCollector(List<?> resultKeys, List<?> resultValues, 083 Collector<Record> collector) { 084 085 for(int i = 0; i < resultKeys.size(); i++) { 086 collector.collect(new Record( 087 _convertObjectToPactData(((List<?>)resultKeys).get(i)), 088 _convertObjectToPactData(((List<?>)resultValues).get(i)))); 089 } 090 091 } 092 093 /** Convert a token to PACT data and add to a Collector. */ 094 public static void convertTokenToCollector(List<Token> tokenList, Collector<Record> collector) 095 throws IllegalActionException { 096 097 if (tokenList == null) { 098 return; 099 } 100 101 for (Token token: tokenList) { 102 if(!(token instanceof ArrayToken)) { 103 throw new IllegalActionException("Read non-array token: " + token); 104 } 105 106 final ArrayToken arrayToken = (ArrayToken)token; 107 108 if(!(arrayToken.getElementType() instanceof RecordType)) { 109 throw new IllegalActionException("Read non-record of array token: " + token); 110 } 111 112 final int length = arrayToken.length(); 113 for(int i = 0; i < length; i++) { 114 final RecordToken element = (RecordToken) arrayToken.getElement(i); 115 final Key keyField = convertTokenToPactKeyData(element.get("key")); 116 final Value valueField = convertTokenToPactValueData(element.get("value")); 117 collector.collect(new Record(keyField, valueField)); 118 } 119 } 120 } 121 122 /** Convert a token to PACT Value data. */ 123 public static Value convertTokenToPactValueData(Token token) throws IllegalActionException { 124 125 Value retval = convertTokenToPactKeyData(token); 126 return retval; 127 } 128 129 /** Convert a token to PACT Key data. */ 130 public static Key convertTokenToPactKeyData(Token token) throws IllegalActionException { 131 132 final Type type = token.getType(); 133 134 Key retval = null; 135 if(type instanceof ObjectType) { 136 137 final Class<?> clazz = ((ObjectType)type).getValueClass(); 138 if(clazz == byte[].class) { 139 byte[] bytes = (byte[])((ObjectToken)token).getValue(); 140 retval = new ByteArray(bytes); 141 } else { 142 throw new IllegalActionException("Conversion of Object Token with class " + 143 clazz + " is unsupported."); 144 } 145 } else if(type == BaseType.STRING) { 146 retval = new StringValue(((StringToken)token).stringValue()); 147 } else if(type == BaseType.INT) { 148 retval = new IntValue(((IntToken)token).intValue()); 149 } else if(type == BaseType.LONG) { 150 retval = new LongValue(((LongToken)token).longValue()); 151 } else if(type == BaseType.DOUBLE) { 152 retval = new DoubleValue(((DoubleToken)token).doubleValue()); 153 } else if(type == BaseType.NIL) { 154 retval = new NullValue(); 155 } else if(type instanceof ArrayType) { 156 if(((ArrayType)type).getElementType() == BaseType.UNSIGNED_BYTE) 157 retval = new ByteArray(arrayTokenToByteArray((ArrayToken)token)); 158 } else if(type instanceof RecordType) { 159 retval = new RecordValue(token.toString()); 160 } 161 if (retval == null){ 162 throw new IllegalActionException("Unsupported token type: " + type); 163 } 164 165 return retval; 166 } 167 168 /** Convert PACT data to an object. */ 169 public static Object convertPactDataToObject(final Record record, final int fieldNum, 170 Class<? extends Value> classType) { 171 172 Object retval = null; 173 174 //LOG.info("record has " + record.getNumFields() + " fields, asking for field " + fieldNum + " as type " + classType); 175 176 // check if the record has enough fields 177 // if not, return null 178 if(record.getNumFields() <= fieldNum) { 179 return retval; 180 } 181 182 final Value value = record.getField(fieldNum, classType); 183 184 //LOG.info(" value retrieved is class " + value.getClass()); 185 186 if(value instanceof NullValue) { 187 retval = null; 188 // NOTE: check for RecordValue before StringValue since it's a subclass. 189 } else if(value instanceof RecordValue) { 190 throw new RuntimeException("Record data not supported..."); 191 } else if(value instanceof StringValue) { 192 retval = ((StringValue) value).getValue(); 193 } else if(value instanceof IntValue) { 194 retval = Integer.valueOf(((IntValue)value).getValue()); 195 } else if(value instanceof LongValue) { 196 retval = Long.valueOf(((LongValue)value).getValue()); 197 } else if(value instanceof DoubleValue) { 198 retval = Double.valueOf(((DoubleValue)value).getValue()); 199 } 200 if (retval == null){ 201 throw new RuntimeException("Unsupported type of PACT data: " + value.getClass()); 202 } 203 204 return retval; 205 } 206 /** Convert PACT data to a token. */ 207 public static Token convertPactDataToToken(final Record record, final int fieldNum, 208 Class<? extends Value> classType) { 209 210 Token retval = null; 211 212 //LOG.info("record has " + record.getNumFields() + " fields, asking for field " + fieldNum + " as type " + classType); 213 214 // check if the record has enough fields 215 // if not, return null 216 if(record.getNumFields() <= fieldNum) { 217 return retval; 218 } 219 220 final Value value = record.getField(fieldNum, classType); 221 222 //LOG.info(" value retrieved is class " + value.getClass()); 223 224 if(value instanceof NullValue) { 225 retval = Token.NIL; 226 // NOTE: check for RecordValue before StringValue since it's a subclass. 227 } else if(value instanceof RecordValue) { 228 try { 229 retval = new RecordToken(((RecordValue) value).getValue()); 230 } catch (IllegalActionException e) { 231 throw new RuntimeException("Error creating RecordToken.", e); 232 } 233 } else if(value instanceof StringValue) { 234 retval = new StringToken(((StringValue) value).getValue()); 235 } else if(value instanceof IntValue) { 236 retval = new IntToken(((IntValue)value).getValue()); 237 } else if(value instanceof LongValue) { 238 retval = new LongToken(((LongValue)value).getValue()); 239 } else if(value instanceof DoubleValue) { 240 retval = new DoubleToken(((DoubleValue)value).getValue()); 241 } else if(value instanceof ByteArray) { //convert PACT ByteArray to Ptolemy ObjectToken, which is better in performance. 242 try { 243 retval = new ObjectToken(((ByteArray)value).getValue(), byte[].class); 244 //retval = new ArrayToken(byteArrayToToken(((ByteArray)value).getValue())); 245 } catch (IllegalActionException e) { 246 throw new RuntimeException("Error creating ObjectToken.", e); 247 } 248 } 249 if (retval == null){ 250 throw new RuntimeException("Unsupported type of PACT data: " + value.getClass()); 251 } 252 253 return retval; 254 } 255 256 /** Convert byte array data to an array of UnsignedByteToken. */ 257 public static UnsignedByteToken[] byteArrayToToken(byte[] byteArray){ 258 UnsignedByteToken[] byteTokenArray = new UnsignedByteToken[byteArray.length]; 259 for (int t = 0; t < byteArray.length; t++) { 260 byteTokenArray[t] = new UnsignedByteToken(byteArray[t]); 261 } 262 return byteTokenArray; 263 } 264 265 /** Convert ArrayToken to an array of Value. 266 * @throws IllegalActionException */ 267 public static Value[] arrayTokenToPactData(ArrayToken arrayToken) throws IllegalActionException{ 268 Value[] byteTokenArray = new Value[arrayToken.length()]; 269 for (int i=0; i<arrayToken.length(); i++) { 270 byteTokenArray[i] = convertTokenToPactValueData(arrayToken.getElement(i)); 271 } 272 return byteTokenArray; 273 } 274 275 /** Convert ArrayToken to an array of byte. 276 * @throws IllegalActionException */ 277 public static byte[] arrayTokenToByteArray(ArrayToken arrayToken) throws IllegalActionException{ 278 if (arrayToken.getElementType() != BaseType.UNSIGNED_BYTE) 279 return null; 280 byte[] byteTokenArray = new byte[arrayToken.length()]; 281 for (int i=0; i<arrayToken.length(); i++) { 282 byteTokenArray[i] = ((UnsignedByteToken)arrayToken.getElement(i)).byteValue(); 283 } 284 return byteTokenArray; 285 } 286 287 /** Convert PACT data to a token. */ 288 public static Token convertPactDataToToken(Iterator<Record> records, int fieldNum, 289 Class<? extends Value> classType) { 290 291 final List<Token> tokenList = new ArrayList<Token>(); 292 while(records.hasNext()) { 293 tokenList.add(convertPactDataToToken(records.next(), fieldNum, classType)); 294 } 295 try { 296 return new ArrayToken(tokenList.toArray(new Token[tokenList.size()])); 297 } catch (IllegalActionException e) { 298 throw new RuntimeException("Error creating array token.", e); 299 } 300 } 301 302 /** Initialize the parts of Kepler required to run the sub-workflow in 303 * the stub. 304 */ 305 public synchronized static void initializeKepler(Configuration parameters) { 306 307 // see if we have already initialized once in this JVM. 308 if(!_haveInitializedOnce.get()) { 309 310 // see if the kepler modules directory exists. 311 String modulesDirStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_MODULES_DIR, null); 312 if(modulesDirStr != null) { 313 File modulesDir = new File(modulesDirStr); 314 if(modulesDir.exists() && modulesDir.isDirectory()) { 315 316 // set the directory 317 ProjectLocator.setKeplerModulesDir(modulesDir); 318 319 // FIXME this is a bad hack 320 // ProjectLocator.userBuildDir has to be set, otherwise findKeplerModulesDir() 321 // is called, which fails when running in the stratosphere server. 322 // userBuildDir is a private field, so we have to use reflection to modify it. 323 // unfortunately, we cannot add a method in ProjectLocator since kepler-tasks 324 // cannot be patched! 325 // 326 // this sets userBuildDir to $HOME/KeplerData/kepler.modules/build-area 327 // 328 if(ProjectLocator.shouldUtilizeUserKeplerModules()) { 329 try { 330 Field field = ProjectLocator.class.getDeclaredField("userBuildDir"); 331 field.setAccessible(true); 332 File file = new File(System.getProperty("user.home") + File.separator + 333 "KeplerData" + File.separator + 334 "kepler.modules" + File.separator + 335 "build-area"); 336 field.set(null, file); 337 } catch (Throwable e) { 338 throw new RuntimeException("Error setting user build dir in ProjectLocator.", e); 339 } 340 } 341 342 try { 343 Kepler.setJavaPropertiesAndCopyModuleDirectories(); 344 } catch (Exception e) { 345 throw new RuntimeException("Error setting java properties and copying module directories.", e); 346 } 347 } 348 } 349 350 _haveInitializedOnce.set(true); 351 } 352 } 353 354 /** Initialize Ptolemy. */ 355 public static void initializePtolemy() { 356 357 // Ptolemy throws an exception if ptolemy.ptII.dir is not set. 358 try { 359 System.setProperty("ptolemy.ptII.dir", "foooo"); 360 } catch(Throwable t) { 361 LOG.error("can't set property.", t); 362 } 363 } 364 365 /////////////////////////////////////////////////////////////////// 366 //// private methods //// 367 368 /** Convert a Java Object to a Pact Key. */ 369 private static Key _convertObjectToPactData(Object object) { 370 371 if(object instanceof String) { 372 return new StringValue((String)object); 373 } else if(object instanceof Integer) { 374 return new IntValue(((Integer)object).intValue()); 375 } else if(object instanceof Long) { 376 return new LongValue(((Long)object).longValue()); 377 } else if(object instanceof Double) { 378 return new DoubleValue(((Double)object).doubleValue()); 379 } 380 381 throw new RuntimeException("Unsupported type of Object: " + object.getClass()); 382 } 383 384 /////////////////////////////////////////////////////////////////// 385 //// private fields //// 386 387 private static final Log LOG = LogFactory.getLog(StubUtilities.class); 388 389 /** If true, we have initialized Kepler for this process. */ 390 private static AtomicBoolean _haveInitializedOnce = new AtomicBoolean(false); 391 392}