001/* A server with a fixed or variable service time. 002 003 Copyright (c) 1998-2018 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.domains.de.lib; 029 030import java.util.Comparator; 031import java.util.Map; 032import java.util.PriorityQueue; 033import java.util.TreeMap; 034 035import ptolemy.actor.TypedIOPort; 036import ptolemy.actor.parameters.PortParameter; 037import ptolemy.actor.util.Time; 038import ptolemy.data.DoubleToken; 039import ptolemy.data.IntToken; 040import ptolemy.data.Token; 041import ptolemy.data.expr.Parameter; 042import ptolemy.data.type.BaseType; 043import ptolemy.kernel.CompositeEntity; 044import ptolemy.kernel.util.Attribute; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.StringAttribute; 048import ptolemy.kernel.util.Workspace; 049 050/////////////////////////////////////////////////////////////////// 051//// Server 052 053/** 054 This actor models a preemptive server with a fixed or variable service time. 055 A server is either busy (serving a customer) or not busy at any given time. 056 If an input arrives when the server is not busy, then the input token is 057 produced on the output with a delay given by the <i>serviceTime</i> 058 parameter. 059 If an input arrives while the server is busy, then that input is 060 queued until the server becomes free, at which point it is produced 061 on the output with a delay given by the <i>serviceTime</i> parameter 062 value at the time that the input arrived. 063 If several inputs arrive while the server is busy, then they are 064 served in an order determined by the value of the <i>priority</i> 065 parameter at the time of arrival of the input, and for inputs with 066 identical priorities, on a first-come, first-served basis. 067 On every firing, produce an output indicating the final queue size. 068 <p> 069 The service time and priority used for a job are the most recently arrived 070 values prior to or simultaneous with the <i>arrival</i> of the job 071 (not with the time at which service begins). Thus, if you want 072 each job to have an independent service time or priority, you should 073 provide each parameter as an input synchronized with each new job arrival. 074 075 @see ptolemy.actor.lib.TimeDelay 076 077 @author Lukito Muliadi, Edward A. Lee, Haiyang Zheng 078 @version $Id$ 079 @since Ptolemy II 0.3 080 @Pt.ProposedRating Yellow (hyzheng) 081 @Pt.AcceptedRating Yellow (hyzheng) 082 */ 083public class Server extends DETransformer { 084 /** Construct an actor with the specified container and name. 085 * @param container The composite entity to contain this one. 086 * @param name The name of this actor. 087 * @exception IllegalActionException If the entity cannot be contained 088 * by the proposed container. 089 * @exception NameDuplicationException If the container already has an 090 * actor with this name. 091 */ 092 public Server(CompositeEntity container, String name) 093 throws NameDuplicationException, IllegalActionException { 094 super(container, name); 095 096 output.setTypeSameAs(input); 097 098 // FIXME: Put in a name change from newServiceTime to serviceTime in MoML filters. 099 serviceTime = new PortParameter(this, "serviceTime"); 100 serviceTime.setExpression("1.0"); 101 serviceTime.setTypeEquals(BaseType.DOUBLE); 102 // Put the delay port at the bottom of the icon by default. 103 StringAttribute cardinality = new StringAttribute(serviceTime.getPort(), 104 "_cardinal"); 105 cardinality.setExpression("SOUTH"); 106 107 priority = new PortParameter(this, "priority"); 108 priority.setExpression("0.0"); 109 priority.setTypeEquals(BaseType.DOUBLE); 110 // Put the priority port at the bottom of the icon by default. 111 cardinality = new StringAttribute(priority.getPort(), "_cardinal"); 112 cardinality.setExpression("SOUTH"); 113 114 _queues = new TreeMap<Double, PriorityQueue<Job>>( 115 new PriorityComparator()); 116 117 _queueCounter = 0; 118 _queueSize = 0; 119 120 size = new TypedIOPort(this, "size", false, true); 121 size.setTypeEquals(BaseType.INT); 122 // Put it at the bottom of the icon by default. 123 cardinality = new StringAttribute(size, "_cardinal"); 124 cardinality.setExpression("SOUTH"); 125 126 capacity = new Parameter(this, "capacity"); 127 capacity.setTypeEquals(BaseType.INT); 128 capacity.setExpression("0"); 129 } 130 131 /////////////////////////////////////////////////////////////////// 132 //// ports and parameters //// 133 134 /** The capacity of the queue. If the value is positive, then 135 * it specifies the capacity of the queue. If it is negative 136 * or 0, then it specifies that the capacity is infinite. 137 * This is an integer with default 0. 138 */ 139 public Parameter capacity; 140 141 /** The current size of the queue. This port produces an output 142 * whenever the size changes. It has type int. 143 */ 144 public TypedIOPort size; 145 146 /** The service time. This is a double with default 1.0. 147 * It is required to be non-negative. 148 */ 149 public PortParameter serviceTime; 150 151 /** The priority. This is a double with default 0.0. 152 * A higher priority implies the task has precedence over tasks 153 * with lower priority values. 154 */ 155 public PortParameter priority; 156 157 /////////////////////////////////////////////////////////////////// 158 //// public methods //// 159 160 /** If the attribute is <i>serviceTime</i>, then ensure that the value 161 * is non-negative, and if the attribute is 162 * <i>capacity</i>, then change the capacity of the queue. 163 * If the size of the queue currently exceeds the specified 164 * capacity, then throw an exception. 165 * @param attribute The attribute that changed. 166 * @exception IllegalActionException If the service time is negative. 167 */ 168 @Override 169 public void attributeChanged(Attribute attribute) 170 throws IllegalActionException { 171 if (attribute == serviceTime) { 172 double value = ((DoubleToken) serviceTime.getToken()).doubleValue(); 173 174 if (value < 0.0 || Double.isNaN(value)) { 175 throw new IllegalActionException(this, 176 "Cannot have negative or NaN serviceTime: " + value); 177 } 178 } else if (attribute == priority) { 179 double value = ((DoubleToken) priority.getToken()).doubleValue(); 180 181 if (Double.isNaN(value)) { 182 throw new IllegalActionException(this, 183 "Cannot have NaN priority: " + value); 184 } 185 } else if (attribute == capacity) { 186 int newCapacity = ((IntToken) capacity.getToken()).intValue(); 187 if (newCapacity > 0 && queueSize() > newCapacity) { 188 throw new IllegalActionException(this, 189 "Queue size (" + queueSize() 190 + ") exceed requested capacity " + newCapacity 191 + ")."); 192 } 193 } else { 194 super.attributeChanged(attribute); 195 } 196 } 197 198 /** Clone the actor into the specified workspace. Set a type 199 * constraint that the output type is the same as the that of input. 200 * @param workspace The workspace for the new object. 201 * @return A new actor. 202 * @exception CloneNotSupportedException If a derived class has 203 * has an attribute that cannot be cloned. 204 */ 205 @Override 206 public Object clone(Workspace workspace) throws CloneNotSupportedException { 207 Server newObject = (Server) super.clone(workspace); 208 newObject.output.setTypeSameAs(newObject.input); 209 newObject._queues = new TreeMap<Double, PriorityQueue<Job>>( 210 new PriorityComparator()); 211 return newObject; 212 } 213 214 /** Declare that the <i>output</i> does not depend on 215 * the <i>input</i>, <i>serviceTime</i>, and <i>priority</i>, in a firing. 216 * @exception IllegalActionException If the causality interface 217 * cannot be computed. 218 * @see #getCausalityInterface() 219 */ 220 @Override 221 public void declareDelayDependency() throws IllegalActionException { 222 // Declare that output does not immediately depend on the input, 223 // though there is no lower bound on the time delay. 224 _declareDelayDependency(input, output, 0.0); 225 _declareDelayDependency(serviceTime.getPort(), output, 0.0); 226 _declareDelayDependency(priority.getPort(), output, 0.0); 227 } 228 229 /** If there is input, read it and put it in the queue. 230 * If the service time has expired for a token currently 231 * in the queue, then send that token on the output. 232 * Produce an output indicating the current queue size. 233 * @exception IllegalActionException If the serviceTime 234 * or priority is invalid, or if an error occurs sending 235 * the output token. 236 */ 237 @Override 238 public void fire() throws IllegalActionException { 239 super.fire(); 240 Time currentTime = getDirector().getModelTime(); 241 242 serviceTime.update(); 243 priority.update(); 244 245 Job currentJob = this.peekQueue(); 246 if (currentJob != null) { 247 currentJob.serviceTimeRemaining = _nextTimeFree 248 .subtractToDouble(currentTime); 249 } 250 251 long nextQueueCounter = _queueCounter; 252 // Consume the input. 253 if (input.hasToken(0)) { 254 double serviceTimeValue = ((DoubleToken) serviceTime.getToken()) 255 .doubleValue(); 256 double priorityValue = ((DoubleToken) priority.getToken()) 257 .doubleValue(); 258 Token token = input.get(0); 259 this.enqueue(new Job(token, serviceTimeValue, priorityValue, 260 currentTime)); 261 if (_debugging) { 262 _debug("Read input with value " + token 263 + ", and put into queue, which now has size" 264 + queueSize() + " at time " + currentTime 265 + ". Event will be processes with service time " 266 + serviceTimeValue); 267 } 268 } 269 270 // If appropriate, produce output. 271 Job job = this.peekQueue(); 272 if (job != null && currentTime.compareTo(_nextTimeFree) == 0 && 273 /* We check the queueCounter to ensure at least a microstep delay */ 274 job.queueCounter < nextQueueCounter) { 275 job = this.dequeue(); 276 Token outputToken = job.payload; 277 output.send(0, outputToken); 278 // Indicate that the server is free. 279 if (_debugging) { 280 _debug("Produced output " + outputToken 281 + ", so queue now has size " + queueSize() + " at time " 282 + currentTime); 283 } 284 } 285 size.send(0, new IntToken(queueSize())); 286 } 287 288 protected int queueSize() { 289 return _queueSize; 290 } 291 292 private Map.Entry<Double, PriorityQueue<Job>> getHighestPriorityQueue() { 293 return _queues.firstEntry(); 294 } 295 296 private Job peekQueue() { 297 Map.Entry<Double, PriorityQueue<Job>> entry = getHighestPriorityQueue(); 298 Job result = entry != null ? entry.getValue().peek() : null; 299 return result; 300 } 301 302 private void enqueue(Job newJob) throws IllegalActionException { 303 int currentCapacity = ((IntToken) capacity.getToken()).intValue(); 304 if (currentCapacity > 0 && queueSize() >= currentCapacity) { 305 throw new IllegalActionException(this, 306 "Queue size (" + queueSize() 307 + ") is already at maximum capacity " 308 + currentCapacity + ")."); 309 } 310 newJob.queueCounter = _queueCounter; 311 PriorityQueue<Job> queue = _queues.get(newJob.priority); 312 if (queue == null) { 313 queue = new PriorityQueue<Job>(); 314 _queues.put(newJob.priority, queue); 315 } 316 queue.add(newJob); 317 ++_queueSize; 318 ++_queueCounter; 319 updateNextTimeFree(); 320 } 321 322 private void updateNextTimeFree() { 323 Job job = this.peekQueue(); 324 if (job != null) { 325 Time now = getDirector().getModelTime(); 326 _nextTimeFree = now.add(job.serviceTimeRemaining); 327 } 328 } 329 330 private Job dequeue() throws IllegalActionException { 331 Job job = null; 332 while (job == null && !_queues.isEmpty()) { 333 Map.Entry<Double, PriorityQueue<Job>> entry = getHighestPriorityQueue(); 334 PriorityQueue<Job> queue = entry.getValue(); 335 if (!queue.isEmpty()) { 336 job = queue.poll(); 337 } 338 if (queue.isEmpty()) { 339 _queues.remove(entry.getKey()); 340 } 341 } 342 assert job != null : "No job found (was queue not empty?)"; 343 if (job != null) { 344 --_queueSize; 345 } 346 updateNextTimeFree(); 347 return job; 348 } 349 350 /** Reset the states of the server to indicate that the server is ready 351 * to serve. 352 * @exception IllegalActionException If the base class throws it. 353 */ 354 @Override 355 public void initialize() throws IllegalActionException { 356 super.initialize(); 357 _nextTimeFree = Time.NEGATIVE_INFINITY; 358 _queues.clear(); 359 _queueCounter = 0; 360 _queueSize = 0; 361 } 362 363 /** If the server is free and there is at least one token in the queue, 364 * request a firing at the current time plus the service time. 365 * @exception IllegalActionException If there is no director. 366 * @return Whatever the superclass returns. 367 */ 368 @Override 369 public boolean postfire() throws IllegalActionException { 370 updateNextTimeFree(); 371 if (!_nextTimeFree.equals(Time.NEGATIVE_INFINITY) && queueSize() > 0) { 372 if (_debugging) { 373 _debug("In postfire, requesting a refiring at time " 374 + _nextTimeFree); 375 } 376 _fireAt(_nextTimeFree); 377 } 378 return super.postfire(); 379 } 380 381 /** Clear the queue so that the capacity can be changed. 382 * @exception IllegalActionException If the superclass throws it. 383 */ 384 @Override 385 public void wrapup() throws IllegalActionException { 386 super.wrapup(); 387 _queues.clear(); 388 _queueCounter = 0; 389 _queueSize = 0; 390 } 391 392 /////////////////////////////////////////////////////////////////// 393 //// private variables //// 394 395 /** Next time the server becomes free. */ 396 protected Time _nextTimeFree; 397 398 protected TreeMap<Double /* the dynamic (possibly modified) priority of the queue */, PriorityQueue<Job>> _queues; 399 400 protected int _queueSize; 401 402 /** The counter for tie-breaking the queue by insertion order. */ 403 protected long _queueCounter; 404 405 /////////////////////////////////////////////////////////////////// 406 //// inner classes //// 407 408 private static class PriorityComparator implements Comparator<Double> { 409 public static int compare(double a, double b) { 410 return Double.compare(b, a); // Note the swapped order (higher priorities come first) 411 } 412 413 @Override 414 public int compare(Double a, Double b) { 415 return compare((double) a, (double) b); 416 } 417 } 418 419 /** A data structure containing a token and a service time. */ 420 private static class Job implements Comparable<Job> { 421 public Job(Token payload, double serviceTime, double priority, 422 Time creationTime) { 423 this.payload = payload; 424 this.serviceTimeRemaining = serviceTime; 425 this.priority = priority; 426 this.creationTime = creationTime; 427 } 428 429 public Token payload; 430 public double serviceTimeRemaining; 431 public double priority; // priority=1 > priority=0 432 public Time creationTime; // creationTime=1 < creationTime=0 433 public long queueCounter; // queueCounter=1 < queueCounter=0 434 435 @Override 436 public int compareTo(Job other) { 437 int result = 0; 438 if (result == 0) { 439 result = PriorityComparator.compare(this.priority, 440 other.priority); 441 } 442 if (result == 0) { 443 result = this.creationTime.compareTo(other.creationTime); 444 } 445 if (result == 0) { 446 result = Long.compare(this.queueCounter, other.queueCounter); 447 } 448 return result; 449 } 450 } 451}