001/* Stratosphere input format for Ptolemy tokens. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.io.input; 031 032import java.io.IOException; 033 034import org.kepler.ddp.Utilities; 035import org.kepler.ddp.actor.pattern.DDPDataSource; 036import org.kepler.stratosphere.stub.StubUtilities; 037 038import eu.stratosphere.api.java.record.io.GenericInputFormat; 039import eu.stratosphere.configuration.Configuration; 040import eu.stratosphere.core.io.GenericInputSplit; 041import eu.stratosphere.types.Record; 042import ptolemy.data.ArrayToken; 043import ptolemy.data.Token; 044import ptolemy.kernel.util.IllegalActionException; 045 046/** An input format for Ptolemy tokens. 047 * 048 * FIXME: only supports array of strings tokens. 049 * 050 * @author Daniel Crawl 051 * @version $Id: TokenInputFormat.java 33628 2015-08-24 22:42:20Z crawl $ 052 * 053 */ 054public class TokenInputFormat extends GenericInputFormat { 055 056 /** Configure the format. */ 057 @Override 058 public void configure(Configuration parameters) { 059 060 if(parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 061 062 String sourceActorName = parameters.getString(Utilities.CONFIGURATION_KEPLER_SOURCE_ACTOR_NAME, null); 063 if(sourceActorName == null) { 064 throw new RuntimeException("Name of DDPDataSource actor not in configuration."); 065 } 066 067 _tokenFromSource = DDPDataSource.getToken(sourceActorName); 068 069 } else { 070 071 String tokenStr = parameters.getString(Utilities.CONFIGURATION_KEPLER_INPUT_TOKEN, null); 072 if(tokenStr == null) { 073 throw new RuntimeException("No input token provided."); 074 } 075 try { 076 _tokenFromSource = new ArrayToken(tokenStr); 077 } catch (IllegalActionException e) { 078 throw new RuntimeException("Error creating ArrayToken from input token string: " + 079 e.getMessage()); 080 } 081 } 082 083 } 084 085 /** Create the input splits that can be processed in parallel. 086 * @param numSplits The minimum number of splits to create. If fewer are 087 * created, some instances may remain idle. 088 */ 089 @Override 090 public TokenInputSplit[] createInputSplits(int numSplits) throws IOException { 091 if (numSplits < 1) { 092 throw new IllegalArgumentException("Number of input splits has to be at least 1."); 093 } 094 095 int length = _tokenFromSource.length(); 096 097 // TODO fix number of splits 098 TokenInputSplit[] splits = new TokenInputSplit[length]; 099 for (int i = 0; i < splits.length; i++) { 100 splits[i] = new TokenInputSplit(i, _tokenFromSource.getElement(i)); 101 } 102 return splits; 103 } 104 105 106 /** Gets the type of the input splits that are processed by this input format. */ 107 /* NOTE: commented out to avoid this warning: 108 * 109 * WARN (eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitManager:getAssignerByType:227) 110 * Unable to find specific input split provider for type org.kepler.stratosphere.io.input.TokenInputSplit, 111 * using default assigner 112 113 @Override 114 public Class<? extends GenericInputSplit> getInputSplitType() { 115 return TokenInputSplit.class; 116 } 117 */ 118 119 // -------------------------------------------------------------------------------------------- 120 121 /** 122 * Opens a parallel instance of the input format to work on a split. 123 * <p> 124 * When this method is called, the input format it guaranteed to be configured. 125 * 126 * @param split The split to be opened. 127 * @throws IOException Thrown, if the spit could not be opened due to an I/O problem. 128 */ 129 @Override 130 public void open(GenericInputSplit split) throws IOException { 131 132 super.open(split); 133 134 if(! (split instanceof TokenInputSplit)) { 135 throw new IllegalArgumentException("Token Input Formats can only be used with TokenInputSplits"); 136 } 137 138 _tokenFromSplit = ((TokenInputSplit)split).getToken(); 139 } 140 141 /** Close an input split. */ 142 @Override 143 public void close() throws IOException { 144 145 super.close(); 146 _tokenFromSplit = null; 147 } 148 149 /** See if the token split has been read. */ 150 @Override 151 public boolean reachedEnd() throws IOException { 152 return (_tokenFromSplit == null); 153 } 154 155 /** Populate a Record with the token from the split. 156 * @return True if the token split has not already been read, otherwise false. 157 */ 158 @Override 159 public boolean nextRecord(Record record) throws IOException { 160 161 if(_tokenFromSplit == null) { 162 return false; 163 } 164 165 try { 166 record.setField(0, StubUtilities.convertTokenToPactKeyData(_tokenFromSplit)); 167 } catch (IllegalActionException e) { 168 throw new RuntimeException("Error converting Token to PACT data.", e); 169 } 170 171 _tokenFromSplit = null; 172 173 return true; 174 } 175 176 /** The token read by the DDP source actor.*/ 177 private ArrayToken _tokenFromSource; 178 179 /** The token read from a split. */ 180 private Token _tokenFromSplit; 181 182 /** Required field for IO formats. */ 183 private static final long serialVersionUID = 1L; 184}