001/* Data output for DDP. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-11-12 22:42:30 +0000 (Wed, 12 Nov 2014) $' 008 * '$Revision: 33062 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern; 031 032import java.io.IOException; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.HashMap; 036import java.util.List; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.FileUtil; 041import org.apache.hadoop.fs.Path; 042import org.kepler.configuration.ConfigurationProperty; 043 044import ptolemy.actor.TypedIOPort; 045import ptolemy.data.ArrayToken; 046import ptolemy.data.BooleanToken; 047import ptolemy.data.StringToken; 048import ptolemy.data.Token; 049import ptolemy.data.expr.Parameter; 050import ptolemy.data.type.ArrayType; 051import ptolemy.data.type.BaseType; 052import ptolemy.data.type.Type; 053import ptolemy.kernel.CompositeEntity; 054import ptolemy.kernel.util.IllegalActionException; 055import ptolemy.kernel.util.NameDuplicationException; 056 057/** This actor writes data to the storage system for a DDP workflow. 058 * The actor reads a set of key-value pairs and combines them based 059 * on the data format specified in formatType. 060 * 061 * @author Daniel Crawl 062 * @version $Id: DDPDataSink.java 33062 2014-11-12 22:42:30Z crawl $ 063 */ 064public class DDPDataSink extends AtomicPathActor { 065 066 /** Construct a new FileDataSink in a container with a given name. */ 067 public DDPDataSink(CompositeEntity container, String name) 068 throws IllegalActionException, NameDuplicationException { 069 super(container, name); 070 071 in = new TypedIOPort(this, "in", true, false); 072 073 // set the types for the in port 074 _keyType = BaseType.GENERAL; 075 _valueType = BaseType.GENERAL; 076 077 out = new TypedIOPort(this, "out", false, true); 078 out.setMultiport(true); 079 080 // add the output formats in the config file as choices 081 _addFormats("Output"); 082 083 _FORMAT_TYPE_CATEGORY = "OutputFormats.Format"; 084 085 // set the default format 086 _setFormat("LineOutputFormat"); 087 088 mergeMultiPartOutputs = new Parameter(this, "mergeMultiPartOutputs"); 089 mergeMultiPartOutputs.setTypeEquals(BaseType.BOOLEAN); 090 mergeMultiPartOutputs.setToken(BooleanToken.TRUE); 091 } 092 093 /** Set a list of tokens for a specific DDPDataSink actor. */ 094 public static void addTokens(String sinkActorName, List<Token> tokenList) throws IllegalActionException { 095 List<Token> tokens = _tokenMap.get(sinkActorName); 096 if(tokens == null) { 097 tokens = new ArrayList<Token>(); 098 _tokenMap.put(sinkActorName, tokens); 099 } 100 synchronized(tokens) { 101 tokens.addAll(tokenList); 102 } 103 } 104 105 /** Update the path parameter if connected. */ 106 @Override 107 public boolean prefire() throws IllegalActionException { 108 109 boolean rc = super.prefire(); 110 111 // make sure it's not empty 112 if(!_formatTypeStr.equals("TokenOutputFormat") && 113 !_formatTypeStr.equals("NullOutputFormat") && 114 ((StringToken)path.getToken()).stringValue().trim().isEmpty()) { 115 throw new IllegalActionException(this, "Path must not be empty."); 116 } 117 118 return rc; 119 } 120 121 /** Write the token in the path parameter to the out port. */ 122 @Override 123 public void fire() throws IllegalActionException { 124 125 if(_formatTypeStr.equals("TokenOutputFormat")) { 126 // remove the tokens from the map so that these tokens 127 // are not present in the next fire(). 128 List<Token> tokens = _tokenMap.remove(getFullName()); 129 if(tokens == null || tokens.isEmpty()) { 130 throw new IllegalActionException(this, "No tokens were written."); 131 } 132 Token[] array = tokens.toArray(new Token[tokens.size()]); 133 out.broadcast(new ArrayToken(array)); 134 } else if(_formatTypeStr.equals("NullOutputFormat")) { 135 out.broadcast(new StringToken("done")); 136 } else { 137 138 if(((BooleanToken)mergeMultiPartOutputs.getToken()).booleanValue()) { 139 _mergeMultiPartOutputs(((StringToken)path.getToken()).stringValue()); 140 } 141 142 out.broadcast(path.getToken()); 143 } 144 145 } 146 147 /** Make sure output is either data or file, but not both. */ 148 @Override 149 public void preinitialize() throws IllegalActionException { 150 151 super.preinitialize(); 152 153 boolean pathIsConnected = (path.getPort().numberOfSources() > 0); 154 155 Token pathToken = path.getToken(); 156 157 if((_formatTypeStr.equals("TokenOutputFormat") || 158 _formatTypeStr.equals("NullOutputFormat")) && 159 (pathIsConnected || 160 (pathToken != null && !((StringToken)pathToken).stringValue().isEmpty()))) { 161 throw new IllegalActionException(this, 162 "TokenOutputFormat or NullOutputFormat and the path port/parameter cannot be used at the same time.\n" + 163 "Either change the output format, or disconnect the path port and clear the\n" + 164 "path parameter."); 165 } 166 167 /* 168 if(!pathIsConnected && 169 (pathToken == null || ((StringToken)pathToken).stringValue().isEmpty())) { 170 formatType.setToken("TokenOutputFormat"); 171 } 172 */ 173 174 175 } 176 177 /** Remove any tokens stored for this actor. */ 178 @Override 179 public void wrapup() throws IllegalActionException { 180 181 super.wrapup(); 182 183 _tokenMap.remove(getFullName()); 184 } 185 186 /** The data to be written. */ 187 public TypedIOPort in; 188 189 /** After data has been written, this port outputs the path. */ 190 public TypedIOPort out; 191 192 /** If true, merge multiple output files into a single file. */ 193 public Parameter mergeMultiPartOutputs; 194 195 196 /////////////////////////////////////////////////////////////////// 197 //// protected methods //// 198 199 /** Update the key and value types. */ 200 @Override 201 protected void _updateKeyValueTypes() { 202 super._updateKeyValueTypes(); 203 Type type = Types.createKeyValueArrayType(_keyType, _valueType); 204 in.typeConstraints().clear(); 205 in.setTypeAtMost(type); 206 out.typeConstraints().clear(); 207 out.setTypeEquals(BaseType.STRING); 208 } 209 210 /** Set the key and value types from the types in the configuration property. */ 211 @Override 212 protected void _setTypesFromConfiguration(ConfigurationProperty formatProperty) 213 throws IllegalActionException { 214 215 // there is no formatProperty found for the key/value type, try to use it directly 216 if (formatProperty == null) { 217 String typesStr = keyValueTypes.stringValue(); 218 if(typesStr.isEmpty()) { 219 throw new IllegalActionException(this, 220 "Parameter keyValueTypes has to be set if third party class\n" + 221 "is set for parameter formatType."); 222 } else { 223 in.typeConstraints().clear(); 224 in.setTypeAtMost(Types.getKeyValueType(keyValueTypes, typesStr)); 225 out.typeConstraints().clear(); 226 out.setTypeEquals(BaseType.STRING); 227 } 228 } else if(formatProperty.getProperty("Name").getValue().equals("TokenOutputFormat")) { 229 in.setTypeEquals(BaseType.UNKNOWN); 230 in.typeConstraints().clear(); 231 // FIXME want to set to at least unsized array of records with key and value 232 in.setTypeAtLeast(ArrayType.ARRAY_UNSIZED_BOTTOM); 233 out.setTypeEquals(BaseType.UNKNOWN); 234 out.typeConstraints().clear(); 235 out.setTypeAtLeast(in); 236 } else if(formatProperty.getProperty("Name").getValue().equals("NullOutputFormat")) { 237 in.setTypeEquals(BaseType.UNKNOWN); 238 in.typeConstraints().clear(); 239 // FIXME want to set to at least unsized array of records with key and value 240 in.setTypeAtLeast(ArrayType.ARRAY_UNSIZED_BOTTOM); 241 out.typeConstraints().clear(); 242 out.setTypeEquals(BaseType.STRING); 243 } else { 244 super._setTypesFromConfiguration(formatProperty); 245 } 246 } 247 248 /////////////////////////////////////////////////////////////////// 249 //// private methods //// 250 251 /** Merge a directory containing multiple output files into a single file. 252 * This method deletes the directory when finished. 253 * 254 * TODO move to parent class? 255 */ 256 private void _mergeMultiPartOutputs(String pathStr) throws IllegalActionException { 257 258 Configuration configuration = new Configuration(); 259 260 Path srcPath = new Path(pathStr); 261 262 try { 263 FileSystem srcPathFileSystem = srcPath.getFileSystem(configuration); 264 // only merge if the output is a directory. 265 if(srcPathFileSystem.isDirectory(srcPath)) { 266 267 Path destPath = new Path(pathStr + "-TMP1234"); 268 269 try { 270 // TODO if there is only one part-r-nnnnnn file, copyMerge() will still 271 // copy it instead of simply renaming it. 272 if(!FileUtil.copyMerge(srcPath.getFileSystem(configuration), srcPath, 273 destPath.getFileSystem(configuration), destPath, 274 true, configuration, "")) { 275 throw new IllegalActionException(this, "Unable to merge output files in " + srcPath + "/."); 276 } 277 } catch (IOException e) { 278 throw new IllegalActionException(this, e, "Error merging multi-part output files in " + srcPath + "/."); 279 } 280 281 try { 282 if(!destPath.getFileSystem(configuration).rename(destPath, srcPath)) { 283 throw new IllegalActionException(this, "Unable to rename " + destPath + " to " + srcPath); 284 } 285 } catch (IOException e) { 286 // TODO Auto-generated catch block 287 e.printStackTrace(); 288 } 289 } 290 } catch(IOException e) { 291 throw new IllegalActionException(this, e, "Error accessing output file " + srcPath); 292 } 293 } 294 295 /** A mapping of DDPDataSink actor name to tokens. */ 296 private static final java.util.Map<String,List<Token>> _tokenMap = 297 Collections.synchronizedMap(new HashMap<String,List<Token>>()); 298 299}