001/* Data output for DDP.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-11-12 22:42:30 +0000 (Wed, 12 Nov 2014) $' 
008 * '$Revision: 33062 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern;
031
032import java.io.IOException;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.HashMap;
036import java.util.List;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.FileUtil;
041import org.apache.hadoop.fs.Path;
042import org.kepler.configuration.ConfigurationProperty;
043
044import ptolemy.actor.TypedIOPort;
045import ptolemy.data.ArrayToken;
046import ptolemy.data.BooleanToken;
047import ptolemy.data.StringToken;
048import ptolemy.data.Token;
049import ptolemy.data.expr.Parameter;
050import ptolemy.data.type.ArrayType;
051import ptolemy.data.type.BaseType;
052import ptolemy.data.type.Type;
053import ptolemy.kernel.CompositeEntity;
054import ptolemy.kernel.util.IllegalActionException;
055import ptolemy.kernel.util.NameDuplicationException;
056
057/** This actor writes data to the storage system for a DDP workflow. 
058 *  The actor reads a set of key-value pairs and combines them based
059 *  on the data format specified in formatType.
060 * 
061 *  @author Daniel Crawl
062 *  @version $Id: DDPDataSink.java 33062 2014-11-12 22:42:30Z crawl $
063 */
064public class DDPDataSink extends AtomicPathActor {
065
066    /** Construct a new FileDataSink in a container with a given name. */
067    public DDPDataSink(CompositeEntity container, String name)
068            throws IllegalActionException, NameDuplicationException {
069        super(container, name);
070
071        in = new TypedIOPort(this, "in", true, false);
072        
073        // set the types for the in port
074        _keyType = BaseType.GENERAL;
075        _valueType = BaseType.GENERAL;
076
077        out = new TypedIOPort(this, "out", false, true);
078        out.setMultiport(true);
079        
080        // add the output formats in the config file as choices 
081        _addFormats("Output");
082
083        _FORMAT_TYPE_CATEGORY = "OutputFormats.Format";
084
085        // set the default format
086        _setFormat("LineOutputFormat");        
087        
088        mergeMultiPartOutputs = new Parameter(this, "mergeMultiPartOutputs");
089        mergeMultiPartOutputs.setTypeEquals(BaseType.BOOLEAN);
090        mergeMultiPartOutputs.setToken(BooleanToken.TRUE);
091    }
092
093    /** Set a list of tokens for a specific DDPDataSink actor. */
094    public static void addTokens(String sinkActorName, List<Token> tokenList) throws IllegalActionException {
095        List<Token> tokens = _tokenMap.get(sinkActorName);
096        if(tokens == null) {
097            tokens = new ArrayList<Token>();
098            _tokenMap.put(sinkActorName, tokens);
099        }
100        synchronized(tokens) {
101            tokens.addAll(tokenList);
102        }
103    }
104
105    /** Update the path parameter if connected. */
106    @Override
107    public boolean prefire() throws IllegalActionException {
108        
109        boolean rc = super.prefire();
110        
111        // make sure it's not empty
112        if(!_formatTypeStr.equals("TokenOutputFormat") &&
113                !_formatTypeStr.equals("NullOutputFormat") &&
114                ((StringToken)path.getToken()).stringValue().trim().isEmpty()) {
115            throw new IllegalActionException(this, "Path must not be empty.");
116        }
117        
118        return rc;
119    }
120    
121    /** Write the token in the path parameter to the out port. */
122    @Override
123    public void fire() throws IllegalActionException {
124                     
125        if(_formatTypeStr.equals("TokenOutputFormat")) {
126            // remove the tokens from the map so that these tokens
127            // are not present in the next fire().
128            List<Token> tokens = _tokenMap.remove(getFullName());
129            if(tokens == null || tokens.isEmpty()) {
130                throw new IllegalActionException(this, "No tokens were written.");
131            }
132            Token[] array = tokens.toArray(new Token[tokens.size()]);
133            out.broadcast(new ArrayToken(array));
134        } else if(_formatTypeStr.equals("NullOutputFormat")) {
135            out.broadcast(new StringToken("done"));
136        } else {
137                        
138            if(((BooleanToken)mergeMultiPartOutputs.getToken()).booleanValue()) {
139                _mergeMultiPartOutputs(((StringToken)path.getToken()).stringValue());
140            }
141            
142            out.broadcast(path.getToken());
143        }
144        
145    }    
146    
147    /** Make sure output is either data or file, but not both. */
148    @Override
149    public void preinitialize() throws IllegalActionException {
150        
151        super.preinitialize();
152        
153        boolean pathIsConnected = (path.getPort().numberOfSources() > 0);
154        
155        Token pathToken = path.getToken();
156        
157        if((_formatTypeStr.equals("TokenOutputFormat") ||
158                _formatTypeStr.equals("NullOutputFormat")) &&
159                (pathIsConnected || 
160                (pathToken != null && !((StringToken)pathToken).stringValue().isEmpty()))) {
161            throw new IllegalActionException(this, 
162                    "TokenOutputFormat or NullOutputFormat and the path port/parameter cannot be used at the same time.\n" +
163                    "Either change the output format, or disconnect the path port and clear the\n" +
164                    "path parameter.");
165        }
166        
167        /*
168        if(!pathIsConnected && 
169                (pathToken == null || ((StringToken)pathToken).stringValue().isEmpty())) {
170            formatType.setToken("TokenOutputFormat");
171        }
172        */
173        
174        
175    }
176
177    /** Remove any tokens stored for this actor. */
178    @Override
179    public void wrapup() throws IllegalActionException {
180        
181        super.wrapup();
182        
183        _tokenMap.remove(getFullName());
184    }
185
186    /** The data to be written. */
187    public TypedIOPort in;
188
189    /** After data has been written, this port outputs the path. */
190    public TypedIOPort out;
191    
192    /** If true, merge multiple output files into a single file. */
193    public Parameter mergeMultiPartOutputs;
194    
195    
196    ///////////////////////////////////////////////////////////////////
197    ////                         protected methods                 ////
198
199    /** Update the key and value types. */
200    @Override
201    protected void _updateKeyValueTypes() {
202        super._updateKeyValueTypes();
203        Type type = Types.createKeyValueArrayType(_keyType, _valueType);
204        in.typeConstraints().clear();
205        in.setTypeAtMost(type);
206        out.typeConstraints().clear();
207        out.setTypeEquals(BaseType.STRING);
208    }
209    
210    /** Set the key and value types from the types in the configuration property. */
211    @Override
212    protected void _setTypesFromConfiguration(ConfigurationProperty formatProperty)
213            throws IllegalActionException {
214        
215        // there is no formatProperty found for the key/value type, try to use it directly
216        if (formatProperty == null) {
217            String typesStr = keyValueTypes.stringValue();
218            if(typesStr.isEmpty()) {
219                throw new IllegalActionException(this,
220                        "Parameter keyValueTypes has to be set if third party class\n" +
221                        "is set for parameter formatType.");
222            } else {
223                in.typeConstraints().clear();
224                in.setTypeAtMost(Types.getKeyValueType(keyValueTypes, typesStr));
225                out.typeConstraints().clear();
226                out.setTypeEquals(BaseType.STRING);
227            }
228        } else if(formatProperty.getProperty("Name").getValue().equals("TokenOutputFormat")) {
229            in.setTypeEquals(BaseType.UNKNOWN);
230            in.typeConstraints().clear();
231            // FIXME want to set to at least unsized array of records with key and value
232            in.setTypeAtLeast(ArrayType.ARRAY_UNSIZED_BOTTOM);
233            out.setTypeEquals(BaseType.UNKNOWN);
234            out.typeConstraints().clear();
235            out.setTypeAtLeast(in);
236        } else if(formatProperty.getProperty("Name").getValue().equals("NullOutputFormat")) {
237            in.setTypeEquals(BaseType.UNKNOWN);
238            in.typeConstraints().clear();
239            // FIXME want to set to at least unsized array of records with key and value
240            in.setTypeAtLeast(ArrayType.ARRAY_UNSIZED_BOTTOM);
241            out.typeConstraints().clear();
242            out.setTypeEquals(BaseType.STRING);
243        } else {
244                super._setTypesFromConfiguration(formatProperty);
245        }
246    }
247    
248    ///////////////////////////////////////////////////////////////////
249    ////                         private methods                 ////
250    
251    /** Merge a directory containing multiple output files into a single file.
252     *  This method deletes the directory when finished.
253     * 
254     * TODO move to parent class?
255     */
256    private void _mergeMultiPartOutputs(String pathStr) throws IllegalActionException {
257        
258        Configuration configuration = new Configuration();
259
260        Path srcPath = new Path(pathStr);
261        
262        try {
263            FileSystem srcPathFileSystem = srcPath.getFileSystem(configuration);
264            // only merge if the output is a directory.
265            if(srcPathFileSystem.isDirectory(srcPath)) {
266            
267                Path destPath = new Path(pathStr + "-TMP1234");        
268                
269                try {
270                    // TODO if there is only one part-r-nnnnnn file, copyMerge() will still
271                    // copy it instead of simply renaming it. 
272                    if(!FileUtil.copyMerge(srcPath.getFileSystem(configuration), srcPath,
273                            destPath.getFileSystem(configuration), destPath,
274                            true, configuration, "")) {
275                        throw new IllegalActionException(this, "Unable to merge output files in " + srcPath + "/.");
276                    }
277                } catch (IOException e) {
278                    throw new IllegalActionException(this, e, "Error merging multi-part output files in " + srcPath + "/.");
279                }
280                
281                try {
282                    if(!destPath.getFileSystem(configuration).rename(destPath, srcPath)) {
283                        throw new IllegalActionException(this, "Unable to rename " + destPath + " to " + srcPath);
284                    }
285                } catch (IOException e) {
286                    // TODO Auto-generated catch block
287                    e.printStackTrace();
288                }
289            }
290        } catch(IOException e) {
291            throw new IllegalActionException(this, e, "Error accessing output file " + srcPath);
292        }
293    }
294
295    /** A mapping of DDPDataSink actor name to tokens. */
296    private static final java.util.Map<String,List<Token>> _tokenMap =
297            Collections.synchronizedMap(new HashMap<String,List<Token>>());
298            
299}