001/* Stratosphere input format for Ptolemy tokens.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.io.input;
031
032import java.io.IOException;
033
034import org.kepler.ddp.Utilities;
035import org.kepler.ddp.actor.pattern.DDPDataSource;
036import org.kepler.stratosphere.stub.StubUtilities;
037
038import eu.stratosphere.api.java.record.io.GenericInputFormat;
039import eu.stratosphere.configuration.Configuration;
040import eu.stratosphere.core.io.GenericInputSplit;
041import eu.stratosphere.types.Record;
042import ptolemy.data.ArrayToken;
043import ptolemy.data.Token;
044import ptolemy.kernel.util.IllegalActionException;
045
046/** An input format for Ptolemy tokens.
047 * 
048 * FIXME: only supports array of strings tokens.
049 * 
050 * @author Daniel Crawl
051 * @version $Id: TokenInputFormat.java 33628 2015-08-24 22:42:20Z crawl $
052 * 
053 */
054public class TokenInputFormat extends GenericInputFormat {
055
056        /** Configure the format. */
057        @Override
058        public void configure(Configuration parameters) {
059                                                        
060                if(parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
061                    
062                    String sourceActorName = parameters.getString(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME, null);
063            if(sourceActorName == null) {
064                throw new RuntimeException("Name of DDPDataSource actor not in configuration.");
065            }
066        
067            _tokenFromSource = DDPDataSource.getToken(sourceActorName);
068                        
069                } else {
070                    
071                    String tokenStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, null);
072                    if(tokenStr == null) {
073                        throw new RuntimeException("No input token provided.");
074                    }
075                    try {
076                _tokenFromSource = new ArrayToken(tokenStr);
077            } catch (IllegalActionException e) {
078                throw new RuntimeException("Error creating ArrayToken from input token string: " +
079                        e.getMessage());
080            }
081                }
082                
083        }
084
085        /** Create the input splits that can be processed in parallel.
086         *  @param numSplits The minimum number of splits to create. If fewer are
087         *  created, some instances may remain idle. 
088         */
089        @Override
090        public TokenInputSplit[] createInputSplits(int numSplits) throws IOException {
091                if (numSplits < 1) {
092                        throw new IllegalArgumentException("Number of input splits has to be at least 1.");
093                }
094                
095                int length = _tokenFromSource.length();
096
097                // TODO fix number of splits
098                TokenInputSplit[] splits = new TokenInputSplit[length];
099                for (int i = 0; i < splits.length; i++) {
100                        splits[i] = new TokenInputSplit(i, _tokenFromSource.getElement(i));
101                }
102                return splits;
103        }
104        
105        
106        /** Gets the type of the input splits that are processed by this input format. */
107        /* NOTE: commented out to avoid this warning:
108         * 
109         * WARN  (eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager:getAssignerByType:227) 
110         * Unable to find specific input split provider for type org.kepler.stratosphere.io.input.TokenInputSplit,
111         *  using default assigner
112        
113        @Override
114        public Class<? extends GenericInputSplit> getInputSplitType() {
115                return TokenInputSplit.class;
116        }
117        */
118
119        // --------------------------------------------------------------------------------------------
120
121        /**
122         * Opens a parallel instance of the input format to work on a split.
123         * <p>
124         * When this method is called, the input format it guaranteed to be configured.
125         * 
126         * @param split The split to be opened.
127         * @throws IOException Thrown, if the spit could not be opened due to an I/O problem.
128         */
129        @Override
130        public void open(GenericInputSplit split) throws IOException {
131                
132                super.open(split);
133                
134                if(! (split instanceof TokenInputSplit)) {
135                        throw new IllegalArgumentException("Token Input Formats can only be used with TokenInputSplits"); 
136                }
137                
138                _tokenFromSplit = ((TokenInputSplit)split).getToken();
139        }
140
141        /** Close an input split. */
142        @Override
143        public void close() throws IOException {
144                
145                super.close();
146                _tokenFromSplit = null;
147        }
148        
149        /** See if the token split has been read. */
150        @Override
151        public boolean reachedEnd() throws IOException {
152                return (_tokenFromSplit == null);
153        }
154
155        /** Populate a Record with the token from the split.
156         *  @return True if the token split has not already been read, otherwise false.  
157         */
158        @Override
159        public boolean nextRecord(Record record) throws IOException {
160                
161                if(_tokenFromSplit == null) {
162                        return false;
163                }
164                                        
165                try {
166            record.setField(0, StubUtilities.convertTokenToPactKeyData(_tokenFromSplit));
167        } catch (IllegalActionException e) {
168            throw new RuntimeException("Error converting Token to PACT data.", e);
169        }
170                
171                _tokenFromSplit = null;
172                
173                return true;
174        }
175
176        /** The token read by the DDP source actor.*/
177        private ArrayToken _tokenFromSource;
178        
179        /** The token read from a split. */
180        private Token _tokenFromSplit;
181        
182        /** Required field for IO formats. */
183        private static final long serialVersionUID = 1L;
184}