001/*
002 * Copyright (c) 2009-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-08-24 22:47:39 +0000 (Mon, 24 Aug 2015) $' 
007 * '$Revision: 33633 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.srb;
031
032import java.io.OutputStream;
033import java.net.URI;
034import java.util.Vector;
035
036import edu.sdsc.grid.io.FileFactory;
037import edu.sdsc.grid.io.GeneralFile;
038import edu.sdsc.grid.io.GeneralFileSystem;
039import edu.sdsc.grid.io.GeneralMetaData;
040import edu.sdsc.grid.io.GeneralRandomAccessFile;
041import edu.sdsc.grid.io.MetaDataCondition;
042import edu.sdsc.grid.io.MetaDataSelect;
043import edu.sdsc.grid.io.MetaDataSet;
044import ptolemy.actor.TypedAtomicActor;
045import ptolemy.actor.TypedIOPort;
046import ptolemy.data.ArrayToken;
047import ptolemy.data.BooleanToken;
048import ptolemy.data.StringToken;
049import ptolemy.data.Token;
050import ptolemy.data.expr.Parameter;
051import ptolemy.data.type.ArrayType;
052import ptolemy.data.type.BaseType;
053import ptolemy.kernel.CompositeEntity;
054import ptolemy.kernel.util.Attribute;
055import ptolemy.kernel.util.IllegalActionException;
056import ptolemy.kernel.util.NameDuplicationException;
057import ptolemy.util.MessageHandler;
058
059//////////////////////////////////////////////////////////////////////////
060//// DataGridTransfer
061/** <p>DataGridTransfer is a Kepler Actor which has a functionality similar to 
062 *  the SRB/IRODS commands, namely Sget, Sput, iget, and iput. 
063 *  DataGridTransfer copies one or more objects to/from a remote filesystem into 
064 *  the local file system.
065 *  The following actor expects as input a reference to local or remote file systems support by the Jargon API. 
066 *  This reference connection is created from the source and destination URL values, 
067 *  Currently available filesystem URLs are, file:///myDir/myfile.txt,
068 *  irods://username:password@myhost.org:1247/myDir/myfile.txt, 
069 *  srb://username.domain:password@myhost.org:5544/myDir/myfile.txt, or ftp and http urls.
070 *  </p><p>
071 *  Currently, the source and destination filesystems can not be changed once the workflow is running. 
072 *  The filepaths can be changed.
073 *  </p><p>
074 *  The file reference system is created with a unique user account and 
075 *  with this connection reference as input the DataGridTransfer actor is 
076 *  able to gain access to various files on the file systems.
077 *  Once an alive DataGridTransfer file connection system has been 
078 *  established the actor gets the destination directory and the source files 
079 *  to establish the DataGridTransfer file path. 
080 *  If the DataGridTransfer destination directory doesn't exist, a new directory
081 *  is created. Once the DataGridTransfer files path are determined, 
082 *  the files are copied from the source file space to the
083 *  local drive. In case the above process of parallel copy fails, 
084 *  a streaming copy process is carried out with random access streams 
085 *  where the file is downloaded as a sequence of byte arrays.
086 *  </p><p>
087 *  There is a parameter to overwrite existing files.
088 *  </p><p>
089 *  <B>Actor Input:</B> Accepts a reference to the
090 *  the files systems, and two arrays of URL file paths.
091 *  </p><p>
092 *  <B>Actor Output:</B> Outputs the destination file paths and an exit status. The exit status gives
093 *  a message of "success" or appropriate error to indicate the status of file get process.
094 *
095 * </p><p>The following actor uses the Dice Research Jargon API provided. 
096 * <A href="http://www.sdsc.edu/srb"><I>Further information on SRB</I> </A>
097 * <A href="http://www.irods.org"><I>Further information on IRODS</I> </A>
098
099   @author Lucas Gilbert, Efrat Jaeger
100   @category.name datagrid
101   @category.name transfer
102*/
103
104public class DataGridTransfer extends TypedAtomicActor {
105
106        /** Construct a constant source with the given container and name.
107         *  Create the <i>value</i> parameter, initialize its value to
108         *  the default value of an IntToken with value 1.
109         *  @param container The container.
110         *  @param name The name of this actor.
111         *  @exception IllegalActionException If the entity cannot be contained
112         *   by the proposed container.
113         *  @exception NameDuplicationException If the container already has an
114         *   actor with this name.
115         */
116        public DataGridTransfer( CompositeEntity container, String name )
117                throws NameDuplicationException, IllegalActionException 
118    {
119                super(container, name);
120
121            source = new TypedIOPort(this, "sourceURL", true, false);
122            source.setTypeEquals(new ArrayType(BaseType.STRING));
123            new Attribute(source, "_showName");
124
125            fetchedFiles = new TypedIOPort(this, "fetchedFiles", false, true);
126            fetchedFiles.setTypeEquals(new ArrayType(BaseType.STRING));
127            new Attribute(fetchedFiles, "_showName");
128
129            exitCode = new TypedIOPort(this, "exitCode", false, true);
130            exitCode.setTypeEquals(BaseType.STRING);
131            new Attribute(exitCode, "_showName");
132
133        destination  = new TypedIOPort(this, "destinationDirectoryURL", true, false);
134        destination.setTypeEquals(BaseType.STRING);
135        new Attribute(destination, "_showName");
136
137//        destinationURLParameter = new Parameter(this, "destinationURLParameter");
138//        destinationURLParameter.setTypeEquals(BaseType.STRING);
139
140            overwriteParameter = new Parameter(this, "overwrite");
141        overwriteParameter.setTypeEquals(BaseType.BOOLEAN);
142        overwriteParameter.setToken(BooleanToken.FALSE);
143
144            _attachText("_iconDescription", "<svg>\n"
145            + "<rect x=\"-25\" y=\"-20\" "
146            + "width=\"50\" height=\"40\" "
147            + "style=\"fill:white\"/>\n"
148            + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10"
149            + " 15,-10 15,10, -15,10\" "
150            + "style=\"fill:red\"/>\n"
151            + "<text x=\"-13.5\" y=\"7\" "
152            + "style=\"font-size:14\">\n"
153            + "DataGrid \n"
154            + "</text>\n"
155            + "<text x=\"-12\" y=\"19\""
156            + "style=\"font-size:11; fill:black; font-family:SansSerif\">"
157            + "Transfer</text>\n"
158            + "</svg>\n");
159
160        }
161
162   /** Paths to the files to fetch
163    */
164        public TypedIOPort source;
165
166
167    /** Paths to the destination
168     */
169        public TypedIOPort fetchedFiles;
170
171    /** Exit status of the operation.
172     */
173        public TypedIOPort exitCode;
174
175    /** Where to put the files on the destination drive.
176     */
177        public TypedIOPort destination;
178
179    /** Where to put the files on the destination drive.
180     */
181//      public Parameter destinationURLParameter;
182
183    /** Overwrite when file is copied
184     */
185        public Parameter overwriteParameter;
186
187
188
189        ///////////////////////////////////////////////////////////////////
190        ////                         public methods                    ////
191        /** Transfer the file or directory, 
192         *  from the source URL to the destination URL. 
193         *  @exception IllegalActionException If it is thrown if the
194         *  file cannot be accessed.
195         */
196    public void fire() throws IllegalActionException 
197    {
198        GeneralFile sourceFile;
199        GeneralFile destinationFile;
200        String _exitCode = "";
201        String destinationURL = null;
202        MetaDataCondition conditions[] = {
203          MetaDataSet.newCondition( GeneralMetaData.FILE_NAME,
204          MetaDataCondition.EQUAL, "fake")
205        };
206        MetaDataSelect selects[] = {
207            MetaDataSet.newSelection( GeneralMetaData.FILE_NAME )
208        };  
209
210
211        try {     
212            //TODO not sure why it is done this way
213                if (destination.getWidth() > 0) {  
214                    destinationURL = ((StringToken)destination.get(0)).stringValue();
215//                      destinationURLParameter.setExpression(destinationURL);
216//                      destinationURL = ((StringToken)destinationURLParameter.getToken()).stringValue();
217                }
218        
219            //TODO a bit excessive?
220            // make sure there is an alive connection. Use the old ones if possible.
221                try {
222                    if (destinationDir == null)
223                            destinationDir = FileFactory.newFile(new URI(destinationURL));
224                    else
225                            destinationDir.query(selects);
226                } catch (Exception ex) { // connection did not exist or was closed.
227                    // try to get a new connection in case the previous one has terminated.
228                destinationDir = FileFactory.newFile(new URI(destinationURL));
229                }
230                destinationDir.mkdirs();
231
232                // Getting the source files list token and copying each file to the destination drive.
233                ArrayToken sourceFilesTokenArr = null;
234                try {
235                        sourceFilesTokenArr = (ArrayToken) source.get(0);
236                } catch (Exception ex) {
237                        _debug("filesToGet port is null.");
238            }
239                if (sourceFilesTokenArr != null) {
240                        Token[] sourceFilesToken = sourceFilesTokenArr.arrayValue();
241                        Vector fetchedFilesVec = new Vector();
242                        
243                        String fileStr = ((StringToken) sourceFilesToken[0]).stringValue();
244                //TODO a bit excessive?
245                // make sure there is an alive connection. Use the old ones if possible.
246                try {
247                        if (sourceFileSystem == null)
248                                sourceFileSystem = FileFactory.newFileSystem(new URI(fileStr));                     
249                        else
250                                sourceFileSystem.query(conditions, selects, 1);
251                } catch (Exception ex) { // connection did not exist or was closed.
252                    // try to get a new connection in case the previous one has terminated.
253                    sourceFile = FileFactory.newFile(new URI(fileStr));
254                    sourceFileSystem = sourceFile.getFileSystem();
255                }
256                
257                        for (int i=0; i<sourceFilesToken.length; i++) 
258                        {
259                            // source file path.
260                                fileStr = ((StringToken) sourceFilesToken[i]).stringValue();
261                                _debug("<FILE_TO_GET>" + fileStr + "<FILE_TO_GET>");
262
263                            sourceFile = FileFactory.newFile(sourceFileSystem, new URI(fileStr).getPath());
264                            if (sourceFile.exists()) {
265                                    _debug("<LOCAL_FILE_PATH>" + destinationDir + "/" + sourceFile.getName() + "</LOCAL_FILE_PATH>");
266
267                                    // copying the source file to the destination drive.
268                        destinationFile = FileFactory.newFile(destinationDir, sourceFile.getName());
269                            boolean overwrite = ((BooleanToken)overwriteParameter.getToken()).booleanValue();
270                            // Don't ask for confirmation in append mode, since there
271                            // will be no loss of data.
272                            if (destinationFile.exists() && !overwrite) {
273                                    // Query for overwrite.
274                                    // FIXME: This should be called in the event thread!
275                                    // There is a chance of deadlock since it is not.
276                                    if (!MessageHandler.yesNoQuestion(
277                                                "OK to overwrite " + destinationFile + "?")) {
278                                        throw new IllegalActionException(this,
279                                                "Please select another file name.");
280                                    }
281                            }
282                                try {
283                                    //overwrite should be checked above
284                                        sourceFile.copyTo( destinationFile, true );
285                                        fetchedFilesVec.add(new StringToken(destinationFile.getAbsolutePath()));
286                                } catch (Exception ex) {
287                                        // If the paralel copy fails try to do a stream copy.
288                                        System.out.println("Paralel get failed due to " + ex.getMessage());
289                                        System.out.println("Trying Stream get.");
290                                        try {
291                                if (destinationFile.exists()) {
292                                    destinationFile.delete();
293                                }
294                                _streamGet(sourceFile, destinationFile);
295                                            // adding successfully fetched files output path to the fetched files array.
296                                        fetchedFilesVec.add(new StringToken(destinationFile.getAbsolutePath()));
297
298                                        } catch (Exception stex) {
299                                              stex.printStackTrace();
300                                        // even if there is an execption output the successfully fetched files.
301                                                System.out.println("failed to copy file " + fileStr +
302                                                                " to " + destinationDir + "/" + sourceFile.getName() + ".");
303                                                _exitCode += "unable to fetch file " + fileStr;
304                                                _exitCode += " to " + destinationDir + "/" + sourceFile.getName() + ".\n";
305                                        }
306                                }
307                                } else {
308                                        System.out.println("file " + fileStr + " does not exist.");
309                                        _exitCode += "file " + fileStr + " does not exist.\n";
310                                }
311                        }
312
313                        Token[] fetchedFilesArr = new StringToken[fetchedFilesVec.size()];
314                        fetchedFilesVec.toArray(fetchedFilesArr);
315                        // broadcast the array only if it's non-empty
316                        if (fetchedFilesArr.length > 0) {
317                                fetchedFiles.broadcast(new ArrayToken(fetchedFilesArr));
318                        } else {
319                                _exitCode = "no files were fetched.";
320                        }
321                        if (_exitCode.equals("")) {
322                                _exitCode = "success";
323                        }
324                        exitCode.broadcast(new StringToken(_exitCode));
325                } else {
326                        // no more files to get.
327                        _refire = false;
328                }
329        } catch (Exception ex) {
330                ex.printStackTrace();
331                sourceFile = null;
332//              fileSystem = SRBUtil.closeConnection(fileSystem);
333                throw new IllegalActionException(this, ex.getMessage());
334        }
335
336    }
337
338        /** Initialize the source file system to null.
339        */
340        public void initialize() throws IllegalActionException {
341            super.initialize();
342            sourceFileSystem = null;
343            destinationDir = null;        
344        }
345
346        /** Post fire the actor. Return false to indicated that the
347         *  process has finished. If it returns true, the process will
348         *  continue indefinitely.
349         */
350        public boolean postfire() throws IllegalActionException {
351                if (_refire) {
352                        return super.postfire();
353                } else return _refire;
354        }
355
356        /** Reset the _refire variable and disconnect
357         */
358        public void wrapup() {
359                _refire = true;
360//              fileSystem = SRBUtil.closeConnection(fileSystem);
361        }
362
363        ///////////////////////////////////////////////////////////////////
364        ////                         private methods                   ////
365
366        /** Stream read the file. Use in case the parallel get fails.
367         */
368        private void _streamGet(GeneralFile sourceFile, GeneralFile destinationFile) throws Exception {
369
370                GeneralRandomAccessFile randomAccessFile= null;
371                byte[] bytesRead = new byte[20000];;
372                int nBytesRead;
373                OutputStream out = FileFactory.newFileOutputStream( destinationFile );
374
375                randomAccessFile = FileFactory.newRandomAccessFile( sourceFile, "r" );
376                nBytesRead = randomAccessFile.read(bytesRead);
377                while (nBytesRead > 0) {
378                        out.write(bytesRead);
379                        nBytesRead = randomAccessFile.read(bytesRead);
380                }
381                out.close();
382        }
383
384        ///////////////////////////////////////////////////////////////////
385        ////                         private members                   ////
386
387        /** Indicator whether the actor should fire again
388         */
389        private boolean _refire = true;
390
391        /** Source file system reference.
392         */
393        private GeneralFileSystem sourceFileSystem = null;
394
395        /** Source file system reference.
396         */
397        private GeneralFile destinationDir = null;
398}