001/* 002 * Copyright (c) 2002-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-08-24 22:47:39 +0000 (Mon, 24 Aug 2015) $' 007 * '$Revision: 33633 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.srb; 031 032import edu.sdsc.grid.io.srb.SRBFile; 033import edu.sdsc.grid.io.srb.SRBFileSystem; 034import edu.sdsc.grid.io.srb.SRBRandomAccessFile; 035import ptolemy.actor.TypedAtomicActor; 036import ptolemy.actor.TypedIOPort; 037import ptolemy.data.ArrayToken; 038import ptolemy.data.ObjectToken; 039import ptolemy.data.StringToken; 040import ptolemy.data.Token; 041import ptolemy.data.UnsignedByteToken; 042import ptolemy.data.type.ArrayType; 043import ptolemy.data.type.BaseType; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047 048////////////////////////////////////////////////////////////////////////// 049//// SRBWriter 050/** 051 * 052 * <p> 053 * SRBWriter/StreamPut is a Kepler Actor which has a functionality similar to 054 * the SRB command namely "Sput".However SRBWriter actor uploads data to the SRB 055 * with a streaming process by writing a byte of arrays to the SRB remote file 056 * instead of a parallel upload. The following actor expects as input a 057 * reference to the SRB file system. This reference connection is obtained via 058 * the SRBConnect Actor in Kepler. <i>See SRBConnect and its documentation.</i> 059 * </p> 060 * <p> 061 * The file reference system is created with a unique SRB user account and with 062 * this connection reference as input the SRBWriter actor is able to gain access 063 * to the SRB file space. Once an alive SRB file connection system has been 064 * established the actor gets the remode SRB file path and creates a 065 * SRBRandomAccessFile stream. The bytes of array taken in as input are further 066 * written to the stream in a loop. 067 * </p> 068 * <p> 069 * <B>Actor Input:</B> Accepts a reference to the SRB files system, an SRB 070 * remote file name and a sequence of bytes array as input. 071 * </p> 072 * <p> 073 * <B>Actor Output:</B> The SRBStreamPut actor sends a trigger once its done 074 * writing the byte arrays to the remote file on the SRB. 075 * 076 * 077 * </p> 078 * <p> 079 * The following actor accesses SRB file reference system and SRB file space 080 * with the SRB Jargon API provided. The JARGON is a pure API for developing 081 * programs with a data grid interface and I/O for SRB file systems. 082 * </p> 083 * <A href="http://www.sdsc.edu/srb"><I>Further information on SRB</I> </A> 084 * 085 * @author Bing Zhu and Efrat Jaeger 086 * @version $Id: SRBWriter.java 33633 2015-08-24 22:47:39Z crawl $ 087 * @since Ptolemy II 3.0.2 088 */ 089public class SRBWriter extends TypedAtomicActor { 090 091 /** 092 * Construct an actor with the given container and name. 093 * 094 * @param container 095 * The container. 096 * @param name 097 * The name of this actor. 098 * @exception IllegalActionException 099 * If the actor cannot be contained by the proposed 100 * container. 101 * @exception NameDuplicationException 102 * If the container already has an actor with this name. 103 */ 104 public SRBWriter(CompositeEntity container, String name) 105 throws NameDuplicationException, IllegalActionException { 106 107 super(container, name); 108 109 SRBFileSystem = new TypedIOPort(this, "SRBFileSystem", true, false); 110 input = new TypedIOPort(this, "input", true, false); 111 remoteFileName = new TypedIOPort(this, "remoteFileName", true, false); 112 trigger = new TypedIOPort(this, "trigger", false, true); 113 114 // Set the type constraint. 115 SRBFileSystem.setTypeEquals(BaseType.GENERAL); 116 input.setTypeEquals(new ArrayType(BaseType.UNSIGNED_BYTE)); 117 remoteFileName.setTypeEquals(BaseType.STRING); 118 trigger.setTypeEquals(BaseType.GENERAL); 119 120 _attachText("_iconDescription", "<svg>\n" 121 + "<rect x=\"-25\" y=\"-20\" " + "width=\"68\" height=\"40\" " 122 + "style=\"fill:white\"/>\n" 123 + "<polygon points=\"-15,-10 -7,-10 -3,-14 4,-14 8,-10" 124 + " 33,-10 33,10, -15,10\" " + "style=\"fill:red\"/>\n" 125 + "<text x=\"-5\" y=\"7\" " + "style=\"font-size:14\">\n" 126 + "SRB \n" + "</text>\n" + "<text x=\"-22\" y=\"19\"" 127 + "style=\"font-size:11; fill:black; font-family:SansSerif\">" 128 + "stream write</text>\n" + "</svg>\n"); 129 130 } 131 132 // ///////////////////////////////////////////////////////////////// 133 // // ports and parameters //// 134 135 /** 136 * Connection reference 137 */ 138 public TypedIOPort SRBFileSystem; 139 140 /** 141 * The SRB file to be written 142 */ 143 public TypedIOPort remoteFileName; 144 145 /** 146 * Input. Array of bytes. 147 */ 148 public TypedIOPort input; 149 150 /** 151 * The trigger port. 152 */ 153 public TypedIOPort trigger; 154 155 // ///////////////////////////////////////////////////////////////// 156 // // public methods //// 157 158 /** 159 * Accepts an SRBFileSystem reference and a file name and outputs the file 160 * content (Array of Bytes). 161 */ 162 public void fire() throws IllegalActionException { 163 SRBFile srbFile; 164 SRBRandomAccessFile srbRandomAccessFile = null; 165 byte[] bytesRead = new byte[20000]; 166 int nBytesRead; 167 168 try { 169 srbFileSystem = (SRBFileSystem) ((ObjectToken) SRBFileSystem.get(0)) 170 .getValue(); 171 String _srbFileName = ((StringToken) remoteFileName.get(0)) 172 .stringValue(); 173 srbFile = new SRBFile(srbFileSystem, _srbFileName); 174 srbRandomAccessFile = new SRBRandomAccessFile(srbFile, "rw"); 175 } catch (Exception ex) { 176 System.out.println("EXCEPTION"); 177 srbFile = null; 178 srbRandomAccessFile = null; 179 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 180 ; 181 ex.printStackTrace(); 182 throw new IllegalActionException(this, ex.getMessage() 183 + ". in actor " + this.getName()); 184 } 185 186 while (true) { 187 ArrayToken dataArrayToken = null; 188 try { 189 dataArrayToken = (ArrayToken) input.get(0); 190 } catch (Exception ex) { 191 } 192 if (dataArrayToken != null) { 193 Token[] dataTokenArr = dataArrayToken.arrayValue(); 194 byte[] dataBytes = new byte[dataTokenArr.length]; 195 for (int j = 0; j < dataTokenArr.length; j++) { 196 dataBytes[j] = (byte) ((UnsignedByteToken) dataTokenArr[j]) 197 .byteValue(); 198 } 199 System.out.println("looping... dataBytes array length = " 200 + dataBytes.length); 201 try { 202 srbRandomAccessFile.write(dataBytes, 0, dataBytes.length); 203 } catch (Throwable e) { 204 System.out 205 .println("expception captured. disconnecting from server ..."); 206 srbFile = null; 207 srbRandomAccessFile = null; 208 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 209 e.printStackTrace(); 210 throw new IllegalActionException(this, 211 "Failed stream writing to file" + srbFile.getName() 212 + ". in actor " + this.getName() + ": " 213 + e.getMessage() + "."); 214 } 215 216 } else { 217 break; 218 } 219 } 220 srbFile = null; 221 srbRandomAccessFile = null; 222 } 223 224 /** 225 * Post fire the actor. Return false to indicate that the process has 226 * finished. 227 */ 228 public boolean postfire() throws IllegalActionException { 229 trigger.broadcast(new ObjectToken()); 230 return false; // FIX ME 231 } 232 233 /** 234 * Disconnect from SRB. 235 */ 236 public void wrapup() { 237 srbFileSystem = SRBUtil.closeConnection(srbFileSystem); 238 } 239 240 private SRBFileSystem srbFileSystem = null; 241 242}