001/* 002 * Copyright (c) 2003-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: welker $' 006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 007 * '$Revision: 24234 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.objectmanager.cache; 031 032import ptolemy.actor.TypedAtomicActor; 033import ptolemy.actor.TypedIOPort; 034import ptolemy.data.BooleanToken; 035import ptolemy.data.IntToken; 036import ptolemy.data.StringToken; 037import ptolemy.data.expr.Parameter; 038import ptolemy.data.type.BaseType; 039import ptolemy.kernel.CompositeEntity; 040import ptolemy.kernel.Port; 041import ptolemy.kernel.util.IllegalActionException; 042import ptolemy.kernel.util.InternalErrorException; 043import ptolemy.kernel.util.NameDuplicationException; 044 045/** 046 * 047 */ 048public class DataCacheGetActor extends TypedAtomicActor { 049 public TypedIOPort inputObjectName = new TypedIOPort(this, 050 "inputObjectName", true, false); 051 public TypedIOPort inputObjectType = new TypedIOPort(this, 052 "inputObjectType", true, false); 053 public TypedIOPort trigger = new TypedIOPort(this, "trigger", true, false); 054 public TypedIOPort outputCacheID = new TypedIOPort(this, "outputCacheID", 055 false, true); 056 public TypedIOPort foundInCache = new TypedIOPort(this, "foundInCache", 057 false, true); 058 public Parameter outputCacheIDTokenProductionRate; 059 private DataCacheManager cacheManager; 060 061 /** 062 * 063 */ 064 public DataCacheGetActor(CompositeEntity container, String name) 065 throws NameDuplicationException, IllegalActionException { 066 super(container, name); 067 inputObjectName.setTypeEquals(BaseType.STRING); 068 inputObjectName.setMultiport(true); 069 inputObjectType.setTypeEquals(BaseType.STRING); 070 inputObjectType.setMultiport(true); 071 outputCacheID.setTypeEquals(BaseType.STRING); 072 foundInCache.setTypeEquals(BaseType.BOOLEAN); 073 outputCacheIDTokenProductionRate = new Parameter(outputCacheID, 074 "tokenProductionRate"); 075 outputCacheIDTokenProductionRate.setExpression("0"); 076 cacheManager = DataCacheManager.getInstance(); 077 } 078 079 /** 080 * Notify this entity that the links to the specified port have been 081 * altered. This sets the production rate of the output port and notifies 082 * the director that the schedule is invalid, if there is a director. 083 * 084 * @param port 085 */ 086 public void connectionsChanged(Port port) { 087 super.connectionsChanged(port); 088 if (port == inputObjectName) { 089 try { 090 outputCacheIDTokenProductionRate.setToken(new IntToken( 091 inputObjectName.getWidth())); 092 // NOTE: schedule is invalidated automatically already 093 // by the changed connections. 094 } catch (IllegalActionException ex) { 095 throw new InternalErrorException(this, ex, 096 "Failed to set the token production rate for the outputCacheID port."); 097 } 098 } 099 } 100 101 /** 102 * 103 */ 104 public void initialize() throws IllegalActionException { 105 } 106 107 /** 108 * 109 */ 110 public boolean prefire() throws IllegalActionException { 111 return super.prefire(); 112 } 113 114 /** 115 * 116 */ 117 public void fire() throws IllegalActionException { 118 super.fire(); 119 if (trigger.getWidth() > 0 && !trigger.hasToken(0)) { // make sure we're 120 // ready to fire 121 return; 122 } 123 124 // find out how many files we're getting from the cache 125 int inputObjectNameWidth = inputObjectName.getWidth(); 126 127 for (int i = 0; i < inputObjectNameWidth; i++) { // loop through each 128 // input on the port 129 // get the input file name 130 StringToken inputObjectNameToken = (StringToken) inputObjectName 131 .get(i); 132 // get the resource type of the file 133 StringToken inputObjectTypeToken = (StringToken) inputObjectType 134 .get(i); 135 136 try { // try to get the file 137 DataCacheFileObject fileItem = cacheManager.getFile( 138 inputObjectNameToken.stringValue(), 139 inputObjectTypeToken.stringValue()); 140 outputCacheID.broadcast(new StringToken(fileItem.getFile() 141 .getAbsolutePath())); 142 foundInCache.broadcast(new BooleanToken(true)); 143 } catch (NullPointerException npe) { // if it is null, return 0 to 144 // show that the cache 145 // doesn't include the current file 146 outputCacheID.broadcast(new StringToken("0")); 147 foundInCache.broadcast(new BooleanToken(false)); 148 } catch (Exception e) { 149 throw new IllegalActionException( 150 "Error getting item from cache: " + e.getMessage()); 151 } 152 } 153 } 154}