001/* Hadoop InputFormat for Ptolemy tokens.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-07-02 15:58:19 +0000 (Wed, 02 Jul 2014) $' 
008 * '$Revision: 32804 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.hadoop.io.input;
031
032import java.io.IOException;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.io.NullWritable;
038import org.apache.hadoop.mapreduce.InputFormat;
039import org.apache.hadoop.mapreduce.InputSplit;
040import org.apache.hadoop.mapreduce.JobContext;
041import org.apache.hadoop.mapreduce.RecordReader;
042import org.apache.hadoop.mapreduce.TaskAttemptContext;
043import org.kepler.ddp.Utilities;
044import org.kepler.ddp.actor.pattern.DDPDataSource;
045import org.kepler.hadoop.io.TokenWritable;
046
047import ptolemy.data.ArrayToken;
048import ptolemy.kernel.util.IllegalActionException;
049
050/** Hadoop InputFormat for Ptolemy tokens.
051 * 
052 *  @author Daniel Crawl
053 *  @verion $Id: TokenInputFormat.java 32804 2014-07-02 15:58:19Z crawl $
054 */
055public class TokenInputFormat extends InputFormat<NullWritable,TokenWritable> {
056
057    /** Get the splits for a job. */
058    @Override
059    public List<InputSplit> getSplits(JobContext context) throws IOException,
060            InterruptedException {
061
062        Configuration parameters = context.getConfiguration();
063        
064        if(parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
065            
066            String sourceActorName = parameters.get(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME, null);
067            if(sourceActorName == null) {
068                throw new RuntimeException("Name of DDPDataSource actor not in configuration.");
069            }
070        
071            _tokenFromSource = DDPDataSource.getToken(sourceActorName);
072                
073        } else {
074            
075            String tokenStr = parameters.get(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, null);
076            if(tokenStr == null) {
077                throw new RuntimeException("No input token provided.");
078            }
079            try {
080                _tokenFromSource = new ArrayToken(tokenStr);
081            } catch (IllegalActionException e) {
082                throw new IOException("Error creating ArrayToken from input token string: " +
083                        e.getMessage(), e);
084            }
085        }
086
087        int length = _tokenFromSource.length();
088
089        // TODO fix number of splits
090        List<InputSplit> splits = new LinkedList<InputSplit>();
091        for (int i = 0; i < length; i++) {
092           splits.add(new TokenInputSplit(_tokenFromSource.getElement(i)));
093        }
094        return splits;
095
096    }
097
098    /** Create the RecordReader for a split. */
099    @Override
100    public RecordReader<NullWritable,TokenWritable> createRecordReader(InputSplit split,
101            TaskAttemptContext context) throws IOException,
102            InterruptedException {
103        return new TokenRecordReader();
104    }
105
106    /** The token read by the DDP source actor.*/
107    private ArrayToken _tokenFromSource;
108    
109}