001/* Stratosphere output format for Ptolemy tokens.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.io.output;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.List;
035
036import org.kepler.ddp.Utilities;
037import org.kepler.ddp.actor.pattern.DDPDataSink;
038import org.kepler.stratosphere.stub.StubUtilities;
039import org.kepler.stratosphere.type.TypeUtilities;
040
041import eu.stratosphere.api.common.io.OutputFormat;
042import eu.stratosphere.api.common.operators.GenericDataSink;
043import eu.stratosphere.configuration.Configuration;
044import eu.stratosphere.types.Record;
045import eu.stratosphere.types.Value;
046import ptolemy.data.RecordToken;
047import ptolemy.data.Token;
048import ptolemy.kernel.util.IllegalActionException;
049
050/** An output format for Ptolemy tokens.
051 *  
052 * @author Daniel Crawl
053 * @version $Id: TokenOutputFormat.java 33628 2015-08-24 22:42:20Z crawl $
054 * 
055 */
056public class TokenOutputFormat implements OutputFormat<Record> {
057
058        public TokenOutputFormat() {
059        }
060
061        /**
062         * Configures this output format. Since output formats are instantiated generically and hence parameterless, 
063         * this method is the place where the output formats set their basic fields based on configuration values.
064         * <p>
065         * This method is always called first on a newly instantiated output format. 
066         *  
067         * @param parameters The configuration with all parameters.
068         */
069        @Override
070    public void configure(Configuration parameters) {
071                
072            // TODO: possible to use this when kepler is in a separate JVM?
073            if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
074                throw new RuntimeException("TokenOutputFormat only works when running Kepler in the same JVM.");
075        }
076            
077            this.sinkActorName = parameters.getString(Utilities.CONFIGURATION_KEPLER_SINK_ACTOR_NAME, null);
078            if(sinkActorName == null) {
079                throw new RuntimeException("Name of DDPDataSink actor not in configuration.");
080            }
081
082
083        this.numFields = parameters.getInteger(NUM_FIELDS_PARAMETER, -1);
084        if (this.numFields < 1) {
085            throw new IllegalArgumentException("Invalid configuration for TokenOutputFormat: " +
086                    "Need to specify number of fields > 0.");
087        }
088        if(this.numFields > 2) {
089            throw new IllegalArgumentException("Invalid configuration for TokenOutputFormat: " +
090                    "Need to specify number of fields < 3.");
091        }
092        
093        @SuppressWarnings("unchecked")
094        Class<Value>[] arr = new Class[this.numFields];
095        this.classes = arr;
096        
097        for (int i = 0; i < this.numFields; i++)
098        {
099            @SuppressWarnings("unchecked")
100            Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null);
101            if (clazz == null) {
102                throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " +
103                    "No type class for parameter " + i);
104            }
105            
106            this.classes[i] = clazz;
107        }
108        
109        this.recordPositions = new int[this.numFields];
110        boolean anyRecordPosDefined = false;
111        boolean allRecordPosDefined = true;
112        
113        for(int i = 0; i < this.numFields; i++) {
114            
115            int pos = parameters.getInteger(RECORD_POSITION_PARAMETER_PREFIX + i, Integer.MIN_VALUE);
116            
117            if(pos != Integer.MIN_VALUE) {
118                anyRecordPosDefined = true;
119                
120                if(pos < 0) {
121                    throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " +
122                            "Invalid record position for parameter " + i);
123                }
124
125                this.recordPositions[i] = pos;
126                
127            } else {
128                allRecordPosDefined = false;
129                
130                this.recordPositions[i] = i;
131            }
132        }
133        
134        if(anyRecordPosDefined && !allRecordPosDefined) {
135            throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " +
136                    "Either none or all record positions must be defined.");
137        }
138        
139        //this.recordDelimiter = parameters.getString(RECORD_DELIMITER_PARAMETER, AbstractConfigBuilder.NEWLINE_DELIMITER);
140        //if (this.recordDelimiter == null) {
141            //throw new IllegalArgumentException("The delimiter in the DelimitedOutputFormat must not be null.");
142        //}
143        //this.fieldDelimiter = parameters.getString(FIELD_DELIMITER_PARAMETER, "|");
144        //this.lenient = parameters.getBoolean(LENIENT_PARSING, false);         
145        }
146        
147        /**
148         * Opens a parallel instance of the output format to store the result of its parallel instance.
149         * <p>
150         * When this method is called, the output format it guaranteed to be configured.
151         * 
152         * @param taskNumber The number of the parallel instance.
153         * @throws IOException Thrown, if the output could not be opened due to an I/O problem.
154         */
155        @Override
156    public void open(int taskNumber) throws IOException {
157            this.tokenList = new ArrayList<Token>();
158        }
159        
160        
161        /**
162         * Adds a record to the output.
163         * <p>
164         * When this method is called, the output format it guaranteed to be opened.
165         * 
166         * @param record The records to add to the output.
167         * @throws IOException Thrown, if the records could not be added to to an I/O problem.
168         */
169        @Override
170    public void writeRecord(Record record) throws IOException {
171            
172        int numRecFields = record.getNumFields();
173        
174        Token recordToken;
175        
176        try {
177            // see if there's only one field
178            if(numRecFields == 1) {
179                // since there's only one field, assume it is the value
180                Token valueToken = StubUtilities.convertPactDataToToken(record, 0, classes[0]);
181                recordToken = new RecordToken(new String[] {"key", "value"}, new Token[] {Token.NIL, valueToken});
182            } else if(numRecFields == 2) {
183                Token keyToken = StubUtilities.convertPactDataToToken(record,
184                        TypeUtilities.KEY_FIELD, classes[TypeUtilities.KEY_FIELD]);
185                Token valueToken = StubUtilities.convertPactDataToToken(record,
186                        TypeUtilities.VALUE_FIELD, classes[TypeUtilities.VALUE_FIELD]);
187                recordToken = new RecordToken(new String[] {"key", "value"}, new Token[] {keyToken, valueToken});
188            } else {
189                throw new RuntimeException("Incorrect number of fields in record: " + numRecFields);            
190            }
191        } catch(IllegalActionException e) {
192            throw new RuntimeException("Error creating RecordToken.", e);
193        }
194        
195        this.tokenList.add(recordToken);
196                
197        }
198        
199        /**
200         * Method that marks the end of the life-cycle of parallel output instance. Should be used to close
201         * channels and streams and release resources.
202         * After this method returns without an error, the output is assumed to be correct.
203         * <p>
204         * When this method is called, the output format it guaranteed to be opened.
205         *  
206         * @throws IOException Thrown, if the input could not be closed properly.
207         */
208        @Override
209    public void close() throws IOException {
210            try {
211            DDPDataSink.addTokens(this.sinkActorName, this.tokenList);
212        } catch (IllegalActionException e) {
213            throw new IOException("Error writing token.", e);
214        }
215            this.tokenList = null;
216        }
217
218           /**
219     * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
220     * fashion.
221     * 
222     * @return A config builder for setting parameters.
223     */
224    public static ConfigBuilder configureRecordFormat(GenericDataSink target) {
225        return new ConfigBuilder(target.getParameters());
226    }
227    
228    //public static final String RECORD_DELIMITER_PARAMETER = "pact.output.record.delimiter";
229    
230    //public static final String FIELD_DELIMITER_PARAMETER = "pact.output.record.field-delimiter";
231    
232    public static final String NUM_FIELDS_PARAMETER = "pact.output.record.num-fields";
233    
234    public static final String FIELD_TYPE_PARAMETER_PREFIX = "pact.output.record.type_";
235    
236    public static final String RECORD_POSITION_PARAMETER_PREFIX = "pact.output.record.position_";
237    
238    //public static final String LENIENT_PARSING = "pact.output.record.lenient";
239
240    /**
241     * Abstract builder used to set parameters to the input format's configuration in a fluent way.
242     */
243    protected static abstract class AbstractConfigBuilder<T>
244    {
245        //private static final String NEWLINE_DELIMITER = "\n";
246        
247        // --------------------------------------------------------------------
248        
249        /**
250         * The configuration into which the parameters will be written.
251         */
252        protected final Configuration config;
253        
254        // --------------------------------------------------------------------
255        
256        /**
257         * Creates a new builder for the given configuration.
258         * 
259         * @param targetConfig The configuration into which the parameters will be written.
260         */
261        protected AbstractConfigBuilder(Configuration targetConfig) {
262            this.config = targetConfig;
263        }
264        
265        // --------------------------------------------------------------------
266        
267        /**
268         * Sets the delimiter to be a single character, namely the given one. The character must be within
269         * the value range <code>0</code> to <code>127</code>.
270         * 
271         * @param delimiter The delimiter character.
272         * @return The builder itself.
273         */
274        /*
275        public T recordDelimiter(char delimiter) {
276            if (delimiter == '\n') {
277                this.config.setString(RECORD_DELIMITER_PARAMETER, NEWLINE_DELIMITER);
278            } else {
279                this.config.setString(RECORD_DELIMITER_PARAMETER, String.valueOf(delimiter));
280            }
281            @SuppressWarnings("unchecked")
282            T ret = (T) this;
283            return ret;
284        }
285        */
286        
287        /**
288         * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
289         * comparison during input parsing. The conversion will be done using the platforms default charset.
290         * 
291         * @param delimiter The delimiter string.
292         * @return The builder itself.
293         */
294        /*
295        public T recordDelimiter(String delimiter) {
296            this.config.setString(RECORD_DELIMITER_PARAMETER, delimiter);
297            @SuppressWarnings("unchecked")
298            T ret = (T) this;
299            return ret;
300        }
301        */
302                
303        /**
304         * Sets the delimiter that delimits the individual fields in the records textual output representation.
305         * 
306         * @param delimiter The character to be used as a field delimiter.
307         * @return The builder itself.
308         */
309        /*
310        public T fieldDelimiter(char delimiter) {
311            this.config.setString(FIELD_DELIMITER_PARAMETER, String.valueOf(delimiter));
312            @SuppressWarnings("unchecked")
313            T ret = (T) this;
314            return ret;
315        }
316        */
317        
318        /**
319         * Adds a field of the record to be serialized to the output. The field at the given position will
320         * be interpreted as the type represented by the given class. The types {@link Object#toString()} method
321         * will be invoked to create a textual representation. 
322         * 
323         * @param type The type of the field.
324         * @param recordPosition The position in the record.
325         * @return The builder itself.
326         */
327        public T field(Class<? extends Value> type, int recordPosition) {
328            final int numYet = this.config.getInteger(NUM_FIELDS_PARAMETER, 0);
329            this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + numYet, type);
330            this.config.setInteger(RECORD_POSITION_PARAMETER_PREFIX + numYet, recordPosition);
331            this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 1);
332            @SuppressWarnings("unchecked")
333            T ret = (T) this;
334            return ret;
335        }
336        
337        /**
338         * Sets the leniency for the serializer. A lenient serializer simply skips missing fields and null
339         * fields in the record, while a non lenient one throws an exception.
340         * 
341         * @param lenient True, if the serializer should be lenient, false otherwise.
342         * @return The builder itself.
343         */
344        /*
345        public T lenient(boolean lenient) {
346            this.config.setBoolean(LENIENT_PARSING, lenient);
347            @SuppressWarnings("unchecked")
348            T ret = (T) this;
349            return ret;
350        }
351        */
352    }
353    
354    /**
355     * A builder used to set parameters to the input format's configuration in a fluent way.
356     */
357    public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder>
358    {
359        /**
360         * Creates a new builder for the given configuration.
361         * 
362         * @param targetConfig The configuration into which the parameters will be written.
363         */
364        protected ConfigBuilder(Configuration targetConfig) {
365            super(targetConfig);
366        }
367        
368    }
369    
370    // --------------------------------------------------------------------------------------------
371    
372    private int numFields;
373    
374    private Class<? extends Value>[] classes;
375    
376    private int[] recordPositions;
377    
378    //private String fieldDelimiter;
379    
380    //private String recordDelimiter;
381        
382    //private boolean lenient;
383    
384    /** A list of tokens accumulated from writeRecord(). */
385    private List<Token> tokenList;
386    
387    /** The full name of the DDPDataSink actor to write the list of tokens. */
388    private String sinkActorName;
389    
390    /** Required field for IO formats. */
391    private static final long serialVersionUID = 1L;
392
393}