001/* Stratosphere output format for Ptolemy tokens. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.io.output; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.List; 035 036import org.kepler.ddp.Utilities; 037import org.kepler.ddp.actor.pattern.DDPDataSink; 038import org.kepler.stratosphere.stub.StubUtilities; 039import org.kepler.stratosphere.type.TypeUtilities; 040 041import eu.stratosphere.api.common.io.OutputFormat; 042import eu.stratosphere.api.common.operators.GenericDataSink; 043import eu.stratosphere.configuration.Configuration; 044import eu.stratosphere.types.Record; 045import eu.stratosphere.types.Value; 046import ptolemy.data.RecordToken; 047import ptolemy.data.Token; 048import ptolemy.kernel.util.IllegalActionException; 049 050/** An output format for Ptolemy tokens. 051 * 052 * @author Daniel Crawl 053 * @version $Id: TokenOutputFormat.java 33628 2015-08-24 22:42:20Z crawl $ 054 * 055 */ 056public class TokenOutputFormat implements OutputFormat<Record> { 057 058 public TokenOutputFormat() { 059 } 060 061 /** 062 * Configures this output format. Since output formats are instantiated generically and hence parameterless, 063 * this method is the place where the output formats set their basic fields based on configuration values. 064 * <p> 065 * This method is always called first on a newly instantiated output format. 066 * 067 * @param parameters The configuration with all parameters. 068 */ 069 @Override 070 public void configure(Configuration parameters) { 071 072 // TODO: possible to use this when kepler is in a separate JVM? 073 if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 074 throw new RuntimeException("TokenOutputFormat only works when running Kepler in the same JVM."); 075 } 076 077 this.sinkActorName = parameters.getString(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME, null); 078 if(sinkActorName == null) { 079 throw new RuntimeException("Name of DDPDataSink actor not in configuration."); 080 } 081 082 083 this.numFields = parameters.getInteger(NUM_FIELDS_PARAMETER, -1); 084 if (this.numFields < 1) { 085 throw new IllegalArgumentException("Invalid configuration for TokenOutputFormat: " + 086 "Need to specify number of fields > 0."); 087 } 088 if(this.numFields > 2) { 089 throw new IllegalArgumentException("Invalid configuration for TokenOutputFormat: " + 090 "Need to specify number of fields < 3."); 091 } 092 093 @SuppressWarnings("unchecked") 094 Class<Value>[] arr = new Class[this.numFields]; 095 this.classes = arr; 096 097 for (int i = 0; i < this.numFields; i++) 098 { 099 @SuppressWarnings("unchecked") 100 Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null); 101 if (clazz == null) { 102 throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + 103 "No type class for parameter " + i); 104 } 105 106 this.classes[i] = clazz; 107 } 108 109 this.recordPositions = new int[this.numFields]; 110 boolean anyRecordPosDefined = false; 111 boolean allRecordPosDefined = true; 112 113 for(int i = 0; i < this.numFields; i++) { 114 115 int pos = parameters.getInteger(RECORD_POSITION_PARAMETER_PREFIX + i, Integer.MIN_VALUE); 116 117 if(pos != Integer.MIN_VALUE) { 118 anyRecordPosDefined = true; 119 120 if(pos < 0) { 121 throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + 122 "Invalid record position for parameter " + i); 123 } 124 125 this.recordPositions[i] = pos; 126 127 } else { 128 allRecordPosDefined = false; 129 130 this.recordPositions[i] = i; 131 } 132 } 133 134 if(anyRecordPosDefined && !allRecordPosDefined) { 135 throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + 136 "Either none or all record positions must be defined."); 137 } 138 139 //this.recordDelimiter = parameters.getString(RECORD_DELIMITER_PARAMETER, AbstractConfigBuilder.NEWLINE_DELIMITER); 140 //if (this.recordDelimiter == null) { 141 //throw new IllegalArgumentException("The delimiter in the DelimitedOutputFormat must not be null."); 142 //} 143 //this.fieldDelimiter = parameters.getString(FIELD_DELIMITER_PARAMETER, "|"); 144 //this.lenient = parameters.getBoolean(LENIENT_PARSING, false); 145 } 146 147 /** 148 * Opens a parallel instance of the output format to store the result of its parallel instance. 149 * <p> 150 * When this method is called, the output format it guaranteed to be configured. 151 * 152 * @param taskNumber The number of the parallel instance. 153 * @throws IOException Thrown, if the output could not be opened due to an I/O problem. 154 */ 155 @Override 156 public void open(int taskNumber) throws IOException { 157 this.tokenList = new ArrayList<Token>(); 158 } 159 160 161 /** 162 * Adds a record to the output. 163 * <p> 164 * When this method is called, the output format it guaranteed to be opened. 165 * 166 * @param record The records to add to the output. 167 * @throws IOException Thrown, if the records could not be added to to an I/O problem. 168 */ 169 @Override 170 public void writeRecord(Record record) throws IOException { 171 172 int numRecFields = record.getNumFields(); 173 174 Token recordToken; 175 176 try { 177 // see if there's only one field 178 if(numRecFields == 1) { 179 // since there's only one field, assume it is the value 180 Token valueToken = StubUtilities.convertPactDataToToken(record, 0, classes[0]); 181 recordToken = new RecordToken(new String[] {"key", "value"}, new Token[] {Token.NIL, valueToken}); 182 } else if(numRecFields == 2) { 183 Token keyToken = StubUtilities.convertPactDataToToken(record, 184 TypeUtilities.KEY_FIELD, classes[TypeUtilities.KEY_FIELD]); 185 Token valueToken = StubUtilities.convertPactDataToToken(record, 186 TypeUtilities.VALUE_FIELD, classes[TypeUtilities.VALUE_FIELD]); 187 recordToken = new RecordToken(new String[] {"key", "value"}, new Token[] {keyToken, valueToken}); 188 } else { 189 throw new RuntimeException("Incorrect number of fields in record: " + numRecFields); 190 } 191 } catch(IllegalActionException e) { 192 throw new RuntimeException("Error creating RecordToken.", e); 193 } 194 195 this.tokenList.add(recordToken); 196 197 } 198 199 /** 200 * Method that marks the end of the life-cycle of parallel output instance. Should be used to close 201 * channels and streams and release resources. 202 * After this method returns without an error, the output is assumed to be correct. 203 * <p> 204 * When this method is called, the output format it guaranteed to be opened. 205 * 206 * @throws IOException Thrown, if the input could not be closed properly. 207 */ 208 @Override 209 public void close() throws IOException { 210 try { 211 DDPDataSink.addTokens(this.sinkActorName, this.tokenList); 212 } catch (IllegalActionException e) { 213 throw new IOException("Error writing token.", e); 214 } 215 this.tokenList = null; 216 } 217 218 /** 219 * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent 220 * fashion. 221 * 222 * @return A config builder for setting parameters. 223 */ 224 public static ConfigBuilder configureRecordFormat(GenericDataSink target) { 225 return new ConfigBuilder(target.getParameters()); 226 } 227 228 //public static final String RECORD_DELIMITER_PARAMETER = "pact.output.record.delimiter"; 229 230 //public static final String FIELD_DELIMITER_PARAMETER = "pact.output.record.field-delimiter"; 231 232 public static final String NUM_FIELDS_PARAMETER = "pact.output.record.num-fields"; 233 234 public static final String FIELD_TYPE_PARAMETER_PREFIX = "pact.output.record.type_"; 235 236 public static final String RECORD_POSITION_PARAMETER_PREFIX = "pact.output.record.position_"; 237 238 //public static final String LENIENT_PARSING = "pact.output.record.lenient"; 239 240 /** 241 * Abstract builder used to set parameters to the input format's configuration in a fluent way. 242 */ 243 protected static abstract class AbstractConfigBuilder<T> 244 { 245 //private static final String NEWLINE_DELIMITER = "\n"; 246 247 // -------------------------------------------------------------------- 248 249 /** 250 * The configuration into which the parameters will be written. 251 */ 252 protected final Configuration config; 253 254 // -------------------------------------------------------------------- 255 256 /** 257 * Creates a new builder for the given configuration. 258 * 259 * @param targetConfig The configuration into which the parameters will be written. 260 */ 261 protected AbstractConfigBuilder(Configuration targetConfig) { 262 this.config = targetConfig; 263 } 264 265 // -------------------------------------------------------------------- 266 267 /** 268 * Sets the delimiter to be a single character, namely the given one. The character must be within 269 * the value range <code>0</code> to <code>127</code>. 270 * 271 * @param delimiter The delimiter character. 272 * @return The builder itself. 273 */ 274 /* 275 public T recordDelimiter(char delimiter) { 276 if (delimiter == '\n') { 277 this.config.setString(RECORD_DELIMITER_PARAMETER, NEWLINE_DELIMITER); 278 } else { 279 this.config.setString(RECORD_DELIMITER_PARAMETER, String.valueOf(delimiter)); 280 } 281 @SuppressWarnings("unchecked") 282 T ret = (T) this; 283 return ret; 284 } 285 */ 286 287 /** 288 * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient 289 * comparison during input parsing. The conversion will be done using the platforms default charset. 290 * 291 * @param delimiter The delimiter string. 292 * @return The builder itself. 293 */ 294 /* 295 public T recordDelimiter(String delimiter) { 296 this.config.setString(RECORD_DELIMITER_PARAMETER, delimiter); 297 @SuppressWarnings("unchecked") 298 T ret = (T) this; 299 return ret; 300 } 301 */ 302 303 /** 304 * Sets the delimiter that delimits the individual fields in the records textual output representation. 305 * 306 * @param delimiter The character to be used as a field delimiter. 307 * @return The builder itself. 308 */ 309 /* 310 public T fieldDelimiter(char delimiter) { 311 this.config.setString(FIELD_DELIMITER_PARAMETER, String.valueOf(delimiter)); 312 @SuppressWarnings("unchecked") 313 T ret = (T) this; 314 return ret; 315 } 316 */ 317 318 /** 319 * Adds a field of the record to be serialized to the output. The field at the given position will 320 * be interpreted as the type represented by the given class. The types {@link Object#toString()} method 321 * will be invoked to create a textual representation. 322 * 323 * @param type The type of the field. 324 * @param recordPosition The position in the record. 325 * @return The builder itself. 326 */ 327 public T field(Class<? extends Value> type, int recordPosition) { 328 final int numYet = this.config.getInteger(NUM_FIELDS_PARAMETER, 0); 329 this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + numYet, type); 330 this.config.setInteger(RECORD_POSITION_PARAMETER_PREFIX + numYet, recordPosition); 331 this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 1); 332 @SuppressWarnings("unchecked") 333 T ret = (T) this; 334 return ret; 335 } 336 337 /** 338 * Sets the leniency for the serializer. A lenient serializer simply skips missing fields and null 339 * fields in the record, while a non lenient one throws an exception. 340 * 341 * @param lenient True, if the serializer should be lenient, false otherwise. 342 * @return The builder itself. 343 */ 344 /* 345 public T lenient(boolean lenient) { 346 this.config.setBoolean(LENIENT_PARSING, lenient); 347 @SuppressWarnings("unchecked") 348 T ret = (T) this; 349 return ret; 350 } 351 */ 352 } 353 354 /** 355 * A builder used to set parameters to the input format's configuration in a fluent way. 356 */ 357 public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> 358 { 359 /** 360 * Creates a new builder for the given configuration. 361 * 362 * @param targetConfig The configuration into which the parameters will be written. 363 */ 364 protected ConfigBuilder(Configuration targetConfig) { 365 super(targetConfig); 366 } 367 368 } 369 370 // -------------------------------------------------------------------------------------------- 371 372 private int numFields; 373 374 private Class<? extends Value>[] classes; 375 376 private int[] recordPositions; 377 378 //private String fieldDelimiter; 379 380 //private String recordDelimiter; 381 382 //private boolean lenient; 383 384 /** A list of tokens accumulated from writeRecord(). */ 385 private List<Token> tokenList; 386 387 /** The full name of the DDPDataSink actor to write the list of tokens. */ 388 private String sinkActorName; 389 390 /** Required field for IO formats. */ 391 private static final long serialVersionUID = 1L; 392 393}