001/*
002 * Copyright (c) 2002-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-08-24 22:47:39 +0000 (Mon, 24 Aug 2015) $' 
007 * '$Revision: 33633 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.srb;
031
032import edu.sdsc.grid.io.srb.SRBFile;
033import edu.sdsc.grid.io.srb.SRBFileSystem;
034import edu.sdsc.grid.io.srb.SRBRandomAccessFile;
035import ptolemy.actor.TypedAtomicActor;
036import ptolemy.actor.TypedIOPort;
037import ptolemy.data.ArrayToken;
038import ptolemy.data.ObjectToken;
039import ptolemy.data.StringToken;
040import ptolemy.data.Token;
041import ptolemy.data.UnsignedByteToken;
042import ptolemy.data.type.ArrayType;
043import ptolemy.data.type.BaseType;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047
048//////////////////////////////////////////////////////////////////////////
049//// SRBWriter
050/**
051 * 
052 * <p>
053 * SRBWriter/StreamPut is a Kepler Actor which has a functionality similar to
054 * the SRB command namely "Sput".However SRBWriter actor uploads data to the SRB
055 * with a streaming process by writing a byte of arrays to the SRB remote file
056 * instead of a parallel upload. The following actor expects as input a
057 * reference to the SRB file system. This reference connection is obtained via
058 * the SRBConnect Actor in Kepler. <i>See SRBConnect and its documentation.</i>
059 * </p>
060 * <p>
061 * The file reference system is created with a unique SRB user account and with
062 * this connection reference as input the SRBWriter actor is able to gain access
063 * to the SRB file space. Once an alive SRB file connection system has been
064 * established the actor gets the remode SRB file path and creates a
065 * SRBRandomAccessFile stream. The bytes of array taken in as input are further
066 * written to the stream in a loop.
067 * </p>
068 * <p>
069 * <B>Actor Input:</B> Accepts a reference to the SRB files system, an SRB
070 * remote file name and a sequence of bytes array as input.
071 * </p>
072 * <p>
073 * <B>Actor Output:</B> The SRBStreamPut actor sends a trigger once its done
074 * writing the byte arrays to the remote file on the SRB.
075 * 
076 * 
077 * </p>
078 * <p>
079 * The following actor accesses SRB file reference system and SRB file space
080 * with the SRB Jargon API provided. The JARGON is a pure API for developing
081 * programs with a data grid interface and I/O for SRB file systems.
082 * </p>
083 * <A href="http://www.sdsc.edu/srb"><I>Further information on SRB</I> </A>
084 * 
085 * @author Bing Zhu and Efrat Jaeger
086 * @version $Id: SRBWriter.java 33633 2015-08-24 22:47:39Z crawl $
087 * @since Ptolemy II 3.0.2
088 */
089public class SRBWriter extends TypedAtomicActor {
090
091        /**
092         * Construct an actor with the given container and name.
093         * 
094         * @param container
095         *            The container.
096         * @param name
097         *            The name of this actor.
098         * @exception IllegalActionException
099         *                If the actor cannot be contained by the proposed
100         *                container.
101         * @exception NameDuplicationException
102         *                If the container already has an actor with this name.
103         */
104        public SRBWriter(CompositeEntity container, String name)
105                        throws NameDuplicationException, IllegalActionException {
106
107                super(container, name);
108
109                SRBFileSystem = new TypedIOPort(this, "SRBFileSystem", true, false);
110                input = new TypedIOPort(this, "input", true, false);
111                remoteFileName = new TypedIOPort(this, "remoteFileName", true, false);
112                trigger = new TypedIOPort(this, "trigger", false, true);
113
114                // Set the type constraint.
115                SRBFileSystem.setTypeEquals(BaseType.GENERAL);
116                input.setTypeEquals(new ArrayType(BaseType.UNSIGNED_BYTE));
117                remoteFileName.setTypeEquals(BaseType.STRING);
118                trigger.setTypeEquals(BaseType.GENERAL);
119
120                _attachText("_iconDescription", "<svg>\n"
121                                + "<rect x=\"-25\" y=\"-20\" " + "width=\"68\" height=\"40\" "
122                                + "style=\"fill:white\"/>\n"
123                                + "<polygon points=\"-15,-10 -7,-10 -3,-14 4,-14 8,-10"
124                                + " 33,-10 33,10, -15,10\" " + "style=\"fill:red\"/>\n"
125                                + "<text x=\"-5\" y=\"7\" " + "style=\"font-size:14\">\n"
126                                + "SRB \n" + "</text>\n" + "<text x=\"-22\" y=\"19\""
127                                + "style=\"font-size:11; fill:black; font-family:SansSerif\">"
128                                + "stream write</text>\n" + "</svg>\n");
129
130        }
131
132        // /////////////////////////////////////////////////////////////////
133        // // ports and parameters ////
134
135        /**
136         * Connection reference
137         */
138        public TypedIOPort SRBFileSystem;
139
140        /**
141         * The SRB file to be written
142         */
143        public TypedIOPort remoteFileName;
144
145        /**
146         * Input. Array of bytes.
147         */
148        public TypedIOPort input;
149
150        /**
151         * The trigger port.
152         */
153        public TypedIOPort trigger;
154
155        // /////////////////////////////////////////////////////////////////
156        // // public methods ////
157
158        /**
159         * Accepts an SRBFileSystem reference and a file name and outputs the file
160         * content (Array of Bytes).
161         */
162        public void fire() throws IllegalActionException {
163                SRBFile srbFile;
164                SRBRandomAccessFile srbRandomAccessFile = null;
165                byte[] bytesRead = new byte[20000];
166                int nBytesRead;
167
168                try {
169                        srbFileSystem = (SRBFileSystem) ((ObjectToken) SRBFileSystem.get(0))
170                                        .getValue();
171                        String _srbFileName = ((StringToken) remoteFileName.get(0))
172                                        .stringValue();
173                        srbFile = new SRBFile(srbFileSystem, _srbFileName);
174                        srbRandomAccessFile = new SRBRandomAccessFile(srbFile, "rw");
175                } catch (Exception ex) {
176                        System.out.println("EXCEPTION");
177                        srbFile = null;
178                        srbRandomAccessFile = null;
179                        srbFileSystem = SRBUtil.closeConnection(srbFileSystem);
180                        ;
181                        ex.printStackTrace();
182                        throw new IllegalActionException(this, ex.getMessage()
183                                        + ". in actor " + this.getName());
184                }
185
186                while (true) {
187                        ArrayToken dataArrayToken = null;
188                        try {
189                                dataArrayToken = (ArrayToken) input.get(0);
190                        } catch (Exception ex) {
191                        }
192                        if (dataArrayToken != null) {
193                                Token[] dataTokenArr = dataArrayToken.arrayValue();
194                                byte[] dataBytes = new byte[dataTokenArr.length];
195                                for (int j = 0; j < dataTokenArr.length; j++) {
196                                        dataBytes[j] = (byte) ((UnsignedByteToken) dataTokenArr[j])
197                                                        .byteValue();
198                                }
199                                System.out.println("looping... dataBytes array length = "
200                                                + dataBytes.length);
201                                try {
202                                        srbRandomAccessFile.write(dataBytes, 0, dataBytes.length);
203                                } catch (Throwable e) {
204                                        System.out
205                                                        .println("expception captured. disconnecting from server ...");
206                                        srbFile = null;
207                                        srbRandomAccessFile = null;
208                                        srbFileSystem = SRBUtil.closeConnection(srbFileSystem);
209                                        e.printStackTrace();
210                                        throw new IllegalActionException(this,
211                                                        "Failed stream writing to file" + srbFile.getName()
212                                                                        + ". in actor " + this.getName() + ": "
213                                                                        + e.getMessage() + ".");
214                                }
215
216                        } else {
217                                break;
218                        }
219                }
220                srbFile = null;
221                srbRandomAccessFile = null;
222        }
223
224        /**
225         * Post fire the actor. Return false to indicate that the process has
226         * finished.
227         */
228        public boolean postfire() throws IllegalActionException {
229                trigger.broadcast(new ObjectToken());
230                return false; // FIX ME
231        }
232
233        /**
234         * Disconnect from SRB.
235         */
236        public void wrapup() {
237                srbFileSystem = SRBUtil.closeConnection(srbFileSystem);
238        }
239
240        private SRBFileSystem srbFileSystem = null;
241
242}