001/* A server with a fixed or variable service time.
002
003 Copyright (c) 1998-2018 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.domains.de.lib;
029
030import java.util.Comparator;
031import java.util.Map;
032import java.util.PriorityQueue;
033import java.util.TreeMap;
034
035import ptolemy.actor.TypedIOPort;
036import ptolemy.actor.parameters.PortParameter;
037import ptolemy.actor.util.Time;
038import ptolemy.data.DoubleToken;
039import ptolemy.data.IntToken;
040import ptolemy.data.Token;
041import ptolemy.data.expr.Parameter;
042import ptolemy.data.type.BaseType;
043import ptolemy.kernel.CompositeEntity;
044import ptolemy.kernel.util.Attribute;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047import ptolemy.kernel.util.StringAttribute;
048import ptolemy.kernel.util.Workspace;
049
050///////////////////////////////////////////////////////////////////
051//// Server
052
053/**
054 This actor models a preemptive server with a fixed or variable service time.
055 A server is either busy (serving a customer) or not busy at any given time.
056 If an input arrives when the server is not busy, then the input token is
057 produced on the output with a delay given by the <i>serviceTime</i>
058 parameter.
059 If an input arrives while the server is busy, then that input is
060 queued until the server becomes free, at which point it is produced
061 on the output with a delay given by the <i>serviceTime</i> parameter
062 value at the time that the input arrived.
063 If several inputs arrive while the server is busy, then they are
064 served in an order determined by the value of the <i>priority</i>
065 parameter at the time of arrival of the input, and for inputs with
066 identical priorities, on a first-come, first-served basis.
067 On every firing, produce an output indicating the final queue size.
068 <p>
069 The service time and priority used for a job are the most recently arrived
070 values prior to or simultaneous with the <i>arrival</i> of the job
071 (not with the time at which service begins). Thus, if you want
072 each job to have an independent service time or priority, you should
073 provide each parameter as an input synchronized with each new job arrival.
074
075 @see ptolemy.actor.lib.TimeDelay
076
077 @author Lukito Muliadi, Edward A. Lee, Haiyang Zheng
078 @version $Id$
079 @since Ptolemy II 0.3
080 @Pt.ProposedRating Yellow (hyzheng)
081 @Pt.AcceptedRating Yellow (hyzheng)
082 */
083public class Server extends DETransformer {
084    /** Construct an actor with the specified container and name.
085     *  @param container The composite entity to contain this one.
086     *  @param name The name of this actor.
087     *  @exception IllegalActionException If the entity cannot be contained
088     *   by the proposed container.
089     *  @exception NameDuplicationException If the container already has an
090     *   actor with this name.
091     */
092    public Server(CompositeEntity container, String name)
093            throws NameDuplicationException, IllegalActionException {
094        super(container, name);
095
096        output.setTypeSameAs(input);
097
098        // FIXME: Put in a name change from newServiceTime to serviceTime in MoML filters.
099        serviceTime = new PortParameter(this, "serviceTime");
100        serviceTime.setExpression("1.0");
101        serviceTime.setTypeEquals(BaseType.DOUBLE);
102        // Put the delay port at the bottom of the icon by default.
103        StringAttribute cardinality = new StringAttribute(serviceTime.getPort(),
104                "_cardinal");
105        cardinality.setExpression("SOUTH");
106
107        priority = new PortParameter(this, "priority");
108        priority.setExpression("0.0");
109        priority.setTypeEquals(BaseType.DOUBLE);
110        // Put the priority port at the bottom of the icon by default.
111        cardinality = new StringAttribute(priority.getPort(), "_cardinal");
112        cardinality.setExpression("SOUTH");
113
114        _queues = new TreeMap<Double, PriorityQueue<Job>>(
115                new PriorityComparator());
116
117        _queueCounter = 0;
118        _queueSize = 0;
119
120        size = new TypedIOPort(this, "size", false, true);
121        size.setTypeEquals(BaseType.INT);
122        // Put it at the bottom of the icon by default.
123        cardinality = new StringAttribute(size, "_cardinal");
124        cardinality.setExpression("SOUTH");
125
126        capacity = new Parameter(this, "capacity");
127        capacity.setTypeEquals(BaseType.INT);
128        capacity.setExpression("0");
129    }
130
131    ///////////////////////////////////////////////////////////////////
132    ////                       ports and parameters                ////
133
134    /** The capacity of the queue. If the value is positive, then
135     *  it specifies the capacity of the queue. If it is negative
136     *  or 0, then it specifies that the capacity is infinite.
137     *  This is an integer with default 0.
138     */
139    public Parameter capacity;
140
141    /** The current size of the queue. This port produces an output
142     *  whenever the size changes. It has type int.
143     */
144    public TypedIOPort size;
145
146    /** The service time. This is a double with default 1.0.
147     *  It is required to be non-negative.
148     */
149    public PortParameter serviceTime;
150
151    /** The priority. This is a double with default 0.0.
152     *  A higher priority implies the task has precedence over tasks
153     *  with lower priority values.
154     */
155    public PortParameter priority;
156
157    ///////////////////////////////////////////////////////////////////
158    ////                         public methods                    ////
159
160    /** If the attribute is <i>serviceTime</i>, then ensure that the value
161     *  is non-negative, and if the attribute is
162     *  <i>capacity</i>, then change the capacity of the queue.
163     *  If the size of the queue currently exceeds the specified
164     *  capacity, then throw an exception.
165     *  @param attribute The attribute that changed.
166     *  @exception IllegalActionException If the service time is negative.
167     */
168    @Override
169    public void attributeChanged(Attribute attribute)
170            throws IllegalActionException {
171        if (attribute == serviceTime) {
172            double value = ((DoubleToken) serviceTime.getToken()).doubleValue();
173
174            if (value < 0.0 || Double.isNaN(value)) {
175                throw new IllegalActionException(this,
176                        "Cannot have negative or NaN serviceTime: " + value);
177            }
178        } else if (attribute == priority) {
179            double value = ((DoubleToken) priority.getToken()).doubleValue();
180
181            if (Double.isNaN(value)) {
182                throw new IllegalActionException(this,
183                        "Cannot have NaN priority: " + value);
184            }
185        } else if (attribute == capacity) {
186            int newCapacity = ((IntToken) capacity.getToken()).intValue();
187            if (newCapacity > 0 && queueSize() > newCapacity) {
188                throw new IllegalActionException(this,
189                        "Queue size (" + queueSize()
190                                + ") exceed requested capacity " + newCapacity
191                                + ").");
192            }
193        } else {
194            super.attributeChanged(attribute);
195        }
196    }
197
198    /** Clone the actor into the specified workspace. Set a type
199     *  constraint that the output type is the same as the that of input.
200     *  @param workspace The workspace for the new object.
201     *  @return A new actor.
202     *  @exception CloneNotSupportedException If a derived class has
203     *   has an attribute that cannot be cloned.
204     */
205    @Override
206    public Object clone(Workspace workspace) throws CloneNotSupportedException {
207        Server newObject = (Server) super.clone(workspace);
208        newObject.output.setTypeSameAs(newObject.input);
209        newObject._queues = new TreeMap<Double, PriorityQueue<Job>>(
210                new PriorityComparator());
211        return newObject;
212    }
213
214    /** Declare that the <i>output</i> does not depend on
215     *  the <i>input</i>, <i>serviceTime</i>, and <i>priority</i>, in a firing.
216     *  @exception IllegalActionException If the causality interface
217     *  cannot be computed.
218     *  @see #getCausalityInterface()
219     */
220    @Override
221    public void declareDelayDependency() throws IllegalActionException {
222        // Declare that output does not immediately depend on the input,
223        // though there is no lower bound on the time delay.
224        _declareDelayDependency(input, output, 0.0);
225        _declareDelayDependency(serviceTime.getPort(), output, 0.0);
226        _declareDelayDependency(priority.getPort(), output, 0.0);
227    }
228
229    /** If there is input, read it and put it in the queue.
230     *  If the service time has expired for a token currently
231     *  in the queue, then send that token on the output.
232     *  Produce an output indicating the current queue size.
233     *  @exception IllegalActionException If the serviceTime
234     *  or priority is invalid, or if an error occurs sending
235     *  the output token.
236     */
237    @Override
238    public void fire() throws IllegalActionException {
239        super.fire();
240        Time currentTime = getDirector().getModelTime();
241
242        serviceTime.update();
243        priority.update();
244
245        Job currentJob = this.peekQueue();
246        if (currentJob != null) {
247            currentJob.serviceTimeRemaining = _nextTimeFree
248                    .subtractToDouble(currentTime);
249        }
250
251        long nextQueueCounter = _queueCounter;
252        // Consume the input.
253        if (input.hasToken(0)) {
254            double serviceTimeValue = ((DoubleToken) serviceTime.getToken())
255                    .doubleValue();
256            double priorityValue = ((DoubleToken) priority.getToken())
257                    .doubleValue();
258            Token token = input.get(0);
259            this.enqueue(new Job(token, serviceTimeValue, priorityValue,
260                    currentTime));
261            if (_debugging) {
262                _debug("Read input with value " + token
263                        + ", and put into queue, which now has size"
264                        + queueSize() + " at time " + currentTime
265                        + ". Event will be processes with service time "
266                        + serviceTimeValue);
267            }
268        }
269
270        // If appropriate, produce output.
271        Job job = this.peekQueue();
272        if (job != null && currentTime.compareTo(_nextTimeFree) == 0 &&
273        /* We check the queueCounter to ensure at least a microstep delay */
274                job.queueCounter < nextQueueCounter) {
275            job = this.dequeue();
276            Token outputToken = job.payload;
277            output.send(0, outputToken);
278            // Indicate that the server is free.
279            if (_debugging) {
280                _debug("Produced output " + outputToken
281                        + ", so queue now has size " + queueSize() + " at time "
282                        + currentTime);
283            }
284        }
285        size.send(0, new IntToken(queueSize()));
286    }
287
288    protected int queueSize() {
289        return _queueSize;
290    }
291
292    private Map.Entry<Double, PriorityQueue<Job>> getHighestPriorityQueue() {
293        return _queues.firstEntry();
294    }
295
296    private Job peekQueue() {
297        Map.Entry<Double, PriorityQueue<Job>> entry = getHighestPriorityQueue();
298        Job result = entry != null ? entry.getValue().peek() : null;
299        return result;
300    }
301
302    private void enqueue(Job newJob) throws IllegalActionException {
303        int currentCapacity = ((IntToken) capacity.getToken()).intValue();
304        if (currentCapacity > 0 && queueSize() >= currentCapacity) {
305            throw new IllegalActionException(this,
306                    "Queue size (" + queueSize()
307                            + ") is already at maximum capacity "
308                            + currentCapacity + ").");
309        }
310        newJob.queueCounter = _queueCounter;
311        PriorityQueue<Job> queue = _queues.get(newJob.priority);
312        if (queue == null) {
313            queue = new PriorityQueue<Job>();
314            _queues.put(newJob.priority, queue);
315        }
316        queue.add(newJob);
317        ++_queueSize;
318        ++_queueCounter;
319        updateNextTimeFree();
320    }
321
322    private void updateNextTimeFree() {
323        Job job = this.peekQueue();
324        if (job != null) {
325            Time now = getDirector().getModelTime();
326            _nextTimeFree = now.add(job.serviceTimeRemaining);
327        }
328    }
329
330    private Job dequeue() throws IllegalActionException {
331        Job job = null;
332        while (job == null && !_queues.isEmpty()) {
333            Map.Entry<Double, PriorityQueue<Job>> entry = getHighestPriorityQueue();
334            PriorityQueue<Job> queue = entry.getValue();
335            if (!queue.isEmpty()) {
336                job = queue.poll();
337            }
338            if (queue.isEmpty()) {
339                _queues.remove(entry.getKey());
340            }
341        }
342        assert job != null : "No job found (was queue not empty?)";
343        if (job != null) {
344            --_queueSize;
345        }
346        updateNextTimeFree();
347        return job;
348    }
349
350    /** Reset the states of the server to indicate that the server is ready
351     *  to serve.
352     *  @exception IllegalActionException If the base class throws it.
353     */
354    @Override
355    public void initialize() throws IllegalActionException {
356        super.initialize();
357        _nextTimeFree = Time.NEGATIVE_INFINITY;
358        _queues.clear();
359        _queueCounter = 0;
360        _queueSize = 0;
361    }
362
363    /** If the server is free and there is at least one token in the queue,
364     *  request a firing at the current time plus the service time.
365     *  @exception IllegalActionException If there is no director.
366     *  @return Whatever the superclass returns.
367     */
368    @Override
369    public boolean postfire() throws IllegalActionException {
370        updateNextTimeFree();
371        if (!_nextTimeFree.equals(Time.NEGATIVE_INFINITY) && queueSize() > 0) {
372            if (_debugging) {
373                _debug("In postfire, requesting a refiring at time "
374                        + _nextTimeFree);
375            }
376            _fireAt(_nextTimeFree);
377        }
378        return super.postfire();
379    }
380
381    /** Clear the queue so that the capacity can be changed.
382     *  @exception IllegalActionException If the superclass throws it.
383     */
384    @Override
385    public void wrapup() throws IllegalActionException {
386        super.wrapup();
387        _queues.clear();
388        _queueCounter = 0;
389        _queueSize = 0;
390    }
391
392    ///////////////////////////////////////////////////////////////////
393    ////                         private variables                 ////
394
395    /** Next time the server becomes free. */
396    protected Time _nextTimeFree;
397
398    protected TreeMap<Double /* the dynamic (possibly modified) priority of the queue */, PriorityQueue<Job>> _queues;
399
400    protected int _queueSize;
401
402    /** The counter for tie-breaking the queue by insertion order. */
403    protected long _queueCounter;
404
405    ///////////////////////////////////////////////////////////////////
406    ////                         inner classes                     ////
407
408    private static class PriorityComparator implements Comparator<Double> {
409        public static int compare(double a, double b) {
410            return Double.compare(b, a); // Note the swapped order (higher priorities come first)
411        }
412
413        @Override
414        public int compare(Double a, Double b) {
415            return compare((double) a, (double) b);
416        }
417    }
418
419    /** A data structure containing a token and a service time. */
420    private static class Job implements Comparable<Job> {
421        public Job(Token payload, double serviceTime, double priority,
422                Time creationTime) {
423            this.payload = payload;
424            this.serviceTimeRemaining = serviceTime;
425            this.priority = priority;
426            this.creationTime = creationTime;
427        }
428
429        public Token payload;
430        public double serviceTimeRemaining;
431        public double priority; //     priority=1 >     priority=0
432        public Time creationTime; // creationTime=1 < creationTime=0
433        public long queueCounter; // queueCounter=1 < queueCounter=0
434
435        @Override
436        public int compareTo(Job other) {
437            int result = 0;
438            if (result == 0) {
439                result = PriorityComparator.compare(this.priority,
440                        other.priority);
441            }
442            if (result == 0) {
443                result = this.creationTime.compareTo(other.creationTime);
444            }
445            if (result == 0) {
446                result = Long.compare(this.queueCounter, other.queueCounter);
447            }
448            return result;
449        }
450    }
451}