001/* CalendarQueue, an implementation of a priority queue.
002
003 Copyright (c) 1998-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.actor.util;
029
030import java.util.Iterator;
031import java.util.LinkedList;
032
033import ptolemy.kernel.util.DebugListener;
034import ptolemy.kernel.util.Debuggable;
035import ptolemy.kernel.util.InternalErrorException;
036import ptolemy.kernel.util.InvalidStateException;
037
038///////////////////////////////////////////////////////////////////
039//// CalendarQueue
040
041/**
042
043 This class implements a fast priority queue. Entries are sorted with
044 the help of a comparator provided to the constructor.  A dequeue
045 operation will remove the smallest entry according to this sort.
046 Entries can be any instance of Object that are acceptable to the
047 specified comparator.
048
049 <p>Entries are enqueued using the put() method, and dequeued using the
050 take() method. The get() method returns the smallest entry in the
051 queue, but without removing it from the queue.  The toArray() methods
052 can be used to examine the contents of the queue.
053
054 <p>This class operates like a 'bag' or multiset collection.  This simply
055 means that an entry can be added into the queue even if it already
056 exists in the queue.  If a 'set' behavior is desired, one can subclass
057 CalendarQueue and override the put() method.
058
059 <p>The queue works as follows.  Entries are conceptually stored in an
060 infinite set of virtual bins (or buckets). The instance of
061 CQComparator is consulted to determine which virtual bin should be
062 used for an entry (by calling its getVirtualBinNumber() method).  Each
063 virtual bin has a width, which can be altered by calling the
064 setBinWidth() method of the CQComparator.  Within each virtual bin,
065 entries are sorted.
066
067 <p>Having an infinite number of bins, however, is not practical.  Thus,
068 the virtual bins are mapped into physical bins (or buckets) by a
069 modulo operation.  If there are <i>n</i> physical bins, then virtual
070 bin <i>i</i> maps into physical bin <i>i</i> mod <i>n</i>.
071
072 <p>This is analogous to a calendar showing 12 months.  Here, <i>n</i> =
073 12.  An event that happens in January of any year is placed in the
074 first month (bin) of this calendar.  Its virtual bin number might be
075 <i>year</i>*12 + <i>month</i>.  Its physical bin number is just
076 <i>month</i>.
077
078 <p>The performance of a calendar queue is very sensitive to the number of
079 bins, the width of the bins, and the relationship of these quantities
080 to the entries that are observed.  Thus, this implementation may
081 frequently change the number of bins.  When it does change the number
082 of bins, it changes them by a specifiable <i>bin count factor</i>.
083 This defaults to 2, but can be specified as a constructor argument.
084 Suppose the bin count factor is <i>binCountFactor</i> and the current
085 number of buckets is <i>n</i> (by default, this starts at 2, but can
086 be specified by a constructor argument, <i>minNumBuckets</i>).  The
087 number of bins will be multiplied by <i>binCountFactor</i> if the
088 queue size exceeds <i>n * binCountFactor</i>.  The number of bins will
089 be divided by <i>binCountFactor</i> if the queue size falls below
090 <i>n/binCountFactor</i>.  Thus, the queue attempts to keep the number
091 of bins close to the size of the queue.  Each time it changes the
092 number of bins, it uses recently dequeued entries to calculate a
093 reasonable bin width (actually, it defers to the associated
094 CQComparator for this calculation).
095
096 <p>
097 For efficiency, this implementation constrains <i>minNumBuckets</i>
098 and <i>binCountFactor</i> to be powers of two. If something other
099 than a power is two is given, the next power of two larger than
100 the specified number is used.  This has the effect of ensuring
101 that the number of bins is always a power of two, and hence
102 the modulo operation used to map virtual bin numbers onto actual
103 bin numbers is a simple masking operation.
104
105 <p>Changing the number of bins is a relatively expensive operation, so it
106 may be worthwhile to increase <i>binCountFactor</i> to reduce the
107 frequency of change operations. Working counter to this, however, is
108 that the queue is most efficient when there is on average one event
109 per bin.  Thus, the queue becomes less efficient if change operations
110 are less frequent.  Change operations can be entirely disabled by
111 calling setAdaptive() with argument <i>false</i>.
112
113 <p>This implementation is not synchronized, so if multiple threads
114 depend on it, the caller must be.
115
116 This implementation is based on:
117 <ul>
118 <li>Randy Brown, <i>CalendarQueues:A Fast Priority Queue Implementation for
119 the Simulation Event Set Problem</i>, Communications of the ACM, October 1988,
120 Volume 31, Number 10.
121 <li>A. Banerjea and E. W. Knightly, <i>Ptolemy Classic implementation of
122 CalendarQueue class.</i>
123 </ul>
124
125 @author Lukito Muliadi and Edward A. Lee
126 @version $Id$
127 @since Ptolemy II 0.2
128 @Pt.ProposedRating Green (eal)
129 @Pt.AcceptedRating Yellow (liuj)
130 @see ptolemy.actor.util.CQComparator
131 */
132public class CalendarQueue implements Debuggable {
133    /** Construct an empty queue with a given comparator, which
134     *  is used to sort the entries.  The bin count factor and the
135     *  initial and minimum number of bins are set to 2.
136     *  @param comparator The comparator used to sort entries.
137     */
138    public CalendarQueue(CQComparator comparator) {
139        _cqComparator = comparator;
140    }
141
142    /** Construct an empty queue with the specified comparator,
143     *  which is used to sort the entries, the specified
144     *  minimum number of buckets, and the specified bin count factor.
145     *  The bin count factor multiplies or divides the number of bins
146     *  when the number of bins is changed.
147     *  The specified minimum number of buckets is also the initial
148     *  number of buckets.
149     *  @param comparator The comparator used to sort entries.
150     *  @param minNumBuckets The minimum number of buckets.
151     *  @param binCountFactor The bin count factor.
152     */
153    public CalendarQueue(CQComparator comparator, int minNumBuckets,
154            int binCountFactor) {
155        this(comparator);
156        _logMinNumBuckets = log(minNumBuckets);
157        _logQueueBinCountFactor = log(binCountFactor);
158
159        // For debugging.
160        // addDebugListener(new StreamListener());
161    }
162
163    ///////////////////////////////////////////////////////////////////
164    ////                         public methods                    ////
165
166    /** Append a listener to the current set of debug listeners.
167     *  If the listener is already in the set, do not add it again.
168     *  @param listener The listener to which to send debug messages.
169     *  @see #removeDebugListener
170     */
171    @Override
172    public void addDebugListener(DebugListener listener) {
173        if (_debugListeners == null) {
174            _debugListeners = new LinkedList();
175        } else {
176            if (_debugListeners.contains(listener)) {
177                return;
178            }
179        }
180
181        _debugListeners.add(listener);
182        _debugging = true;
183    }
184
185    /** Empty the queue, discarding all current information.
186     *  On the next put(), the queue will be reinitialized, including
187     *  setting the bin width and zero reference of the comparator.
188     *  @see CQComparator#setZeroReference
189     */
190    public void clear() {
191        _initialized = false;
192        _queueSize = 0;
193        _indexOfMinimumBucketValid = false;
194        _cachedMinimumBucket = null;
195    }
196
197    /** Return entry that is at the head of the
198     *  queue (i.e. the one that will be obtained by the next take()).
199     *  If the queue is empty, then throw an InvalidStateException,
200     *  since the emptiness can be tested.
201     *
202     *  @return Object The smallest entry in the queue.
203     *  @exception InvalidStateException If the queue is empty.
204     */
205    public final Object get() throws InvalidStateException {
206        if (_indexOfMinimumBucketValid) {
207            // If the queue has not been changed, return the cached bucket.
208            return _cachedMinimumBucket;
209        } else {
210            int indexOfMinimum = _getIndexOfMinimumBucket();
211            Object result = _getFromBucket(indexOfMinimum);
212            _collect(result);
213            if (_debugging) {
214                _debug("--- getting from queue: " + result);
215            }
216            _cachedMinimumBucket = result;
217            return result;
218        }
219    }
220
221    /** Return true if the specified entry is in the queue.
222     *  This is checked using the equals() method.
223     *  @param entry The entry.
224     *  @return True if the specified entry is in the queue.
225     */
226    public boolean includes(Object entry) {
227        if (_queueSize == 0) {
228            return false;
229        }
230
231        return _bucket[_getBinIndex(entry)].includes(entry);
232    }
233
234    /** Return true if the queue is empty, and false otherwise.
235     *  @return True if empty, false otherwise.
236     */
237    public final boolean isEmpty() {
238        return _queueSize == 0;
239    }
240
241    /** Return the power of two belonging to the least integer
242     *  power of two larger than or equal to the specified integer.
243     *  The return value is always between 0 and 31, inclusive.
244     *  @param value The value for which to find the next power of 2.
245     *  @return A number <i>n</i> such that 2<sup><i>n</i></sup>
246     *   is greater than or equal to <i>value</i>.
247     *  @exception ArithmeticException If the specified value is
248     *   not positive.
249     */
250    public static int log(int value) {
251        if (value <= 0) {
252            throw new ArithmeticException(
253                    "CalendarQueue: Cannot take the log of a non-positive number: "
254                            + value);
255        }
256
257        if (value == 1) {
258            return 0;
259        }
260
261        int result = 1;
262
263        while (1 << result < value && result < 32) {
264            result = result << 1;
265        }
266
267        return result;
268    }
269
270    /** Add an entry to the queue.
271     *  The first time this is called after queue creation or after
272     *  calling clear() results in the comparator having its zero reference
273     *  set to the argument and its bin width set to the default.
274     *  Also, the number of buckets is set to the minimum (which has
275     *  been given to the constructor).
276     *
277     *  If the specified entry cannot be compared by the comparator
278     *  associated with this queue, then a ClassCastException will
279     *  be thrown.
280     *
281     *  This method always returns true, but the return value may be
282     *  used in derived classes to indicate that the entry is
283     *  already on the queue
284     *  and is not added again.  In this class, the entry is always added,
285     *  even if an identical entry already exists on the queue.
286     *
287     *  @param entry The entry to be added to the queue.
288     *  @return True.
289     *  @exception ClassCastException If the specified entry cannot
290     *   be compared by the associated comparator.
291     */
292    public boolean put(Object entry) {
293        if (_debugging) {
294            _debug("+++ putting in queue: " + entry);
295        }
296
297        // If this is the first put since the queue creation,
298        // then do initialization.
299        if (!_initialized) {
300            _cqComparator.setZeroReference(entry);
301            _cqComparator.setBinWidth(null);
302            _queueSize = 0;
303            _localInit(_logMinNumBuckets, entry);
304
305            // Indicate that we do not have enough samples redo width.
306            _sampleValid = false;
307        }
308
309        // Get the bin number.
310        int binNumber = _getBinIndex(entry);
311
312        // If _minimumEntry is equal to null (which happens before any entries
313        // are put in the queue), or the queue size is zero,
314        // or if the new entry is less than the current
315        // smallest entry) then update the minimum entry of the queue.
316        if (_minimumEntry == null || _queueSize == 0
317                || _cqComparator.compare(entry, _minimumEntry) < 0) {
318            _minimumEntry = entry;
319            _minVirtualBucket = _cqComparator.getVirtualBinNumber(entry);
320            _minBucket = _getBinIndex(entry);
321        }
322
323        // Insert the entry into bucket binNumber, which has a sorted list.
324        _bucket[binNumber].insert(entry);
325
326        // Indicate increased size.
327        ++_queueSize;
328
329        // Change the calendar size if needed.
330        _resize(true);
331
332        return true;
333    }
334
335    /** Remove the specified entry and return true if the entry
336     *  is found and successfully removed, and false
337     *  if it is not found.
338     *  Equality is tested using the equals() method of the entry.
339     *  If there are multiple entries in the queue that match, then
340     *  only the first one is removed. The first one always
341     *  corresponds to the one enqueued first among those multiple entries.
342     *  Therefore, the queue has FIFO behavior.
343     *  @param entry The entry to remove.
344     *  @return True If a match is found and the entry is removed.
345     */
346    public boolean remove(Object entry) {
347        // If the queue is empty then return false.
348        if (_queueSize == 0) {
349            return false;
350        }
351
352        CQLinkedList bucket = _bucket[_getBinIndex(entry)];
353        if (bucket == null) {
354            return false;
355        }
356        boolean result = bucket.remove(entry);
357
358        if (result) {
359            _queueSize--;
360            _resize(false);
361        }
362
363        return result;
364    }
365
366    /** Unregister a debug listener.  If the specified listener has not
367     *  been previously registered, then do nothing.
368     *  @param listener The listener to remove from the list of listeners
369     *  to which debug messages are sent.
370     *  @see #addDebugListener
371     */
372    @Override
373    public void removeDebugListener(DebugListener listener) {
374        if (_debugListeners == null) {
375            return;
376        }
377
378        _debugListeners.remove(listener);
379
380        if (_debugListeners.size() == 0) {
381            _debugListeners = null;
382            _debugging = false;
383        }
384
385        return;
386    }
387
388    /** Enable or disable changing the number of bins (or buckets)
389     *  in the queue.  These change operations are fairly expensive,
390     *  so in some circumstances you may wish to simply set the
391     *  number of bins (using the constructor) and leave it fixed.
392     *  If however the queue size becomes much larger or much smaller
393     *  than the number of bins, the queue will become much less
394     *  efficient.
395     *  @param flag If false, disable changing the number of bins.
396     */
397    public void setAdaptive(boolean flag) {
398        _resizeEnabled = flag;
399    }
400
401    /** Return the queue size, which is the number of entries currently
402     *  in the queue.
403     *  @return The queue size.
404     */
405    public final int size() {
406        return _queueSize;
407    }
408
409    /** Remove the smallest entry and return it.
410     *  If there are multiple smallest entries, then return the
411     *  first one that was put in the queue (FIFO behavior).
412     *  If the queue is empty, then through an InvalidStateException,
413     *  since the emptiness can be tested.
414     *
415     *  @return The entry that is smallest, per the comparator.
416     *  @exception InvalidStateException If the queue is empty.
417     */
418    public final Object take() throws InvalidStateException {
419        int indexOfMinimum = _getIndexOfMinimumBucket();
420        Object result = _takeFromBucket(indexOfMinimum);
421        _collect(result);
422        if (_debugging) {
423            _debug("--- taking from queue: " + result);
424        }
425        return result;
426    }
427
428    /** Return the entries currently in the queue as an array.
429     *  @return The entries currently in the queue.
430     */
431    public final Object[] toArray() {
432        return toArray(Integer.MAX_VALUE);
433    }
434
435    /** Return the entries currently in the queue,
436     *  but no more of them than the number given as an
437     *  argument.
438     *  To get all the entries in the queue, call this method
439     *  with argument Integer.MAX_VALUE.
440     *  @param limit The maximum number of entries desired.
441     *  @return The entries currently in the queue.
442     */
443    public final Object[] toArray(int limit) {
444        if (limit > _queueSize) {
445            limit = _queueSize;
446        }
447
448        Object[] result = new Object[limit];
449
450        if (_queueSize == 0) {
451            return result;
452        }
453
454        int index = 0;
455
456        // Iterate through the buckets collecting entries in order.
457        // In each bucket, accept all entries in the current year
458        // (with the right virtual bucket number).
459        int currentBucket = _minBucket;
460        long virtualBucket = _minVirtualBucket;
461        long minimumNextVirtualBucket = Long.MAX_VALUE;
462
463        // If we have not found a value, then foundValue is false
464        // We used to check minimumNextVirtualBucket to see if
465        // it was equal to Long.MAX_VALUE, but this causes problems
466        // because it means CQComparator.getVirtualBinNumber can no longer
467        // return ong.MAX_VALUE.
468        boolean foundValue = false;
469
470        int nextStartBucket = _minBucket;
471
472        // Keep track of where we are in each bucket.
473        CQCell[] bucketHead = new CQCell[_bucket.length];
474
475        for (int i = 0; i < _bucket.length; i++) {
476            bucketHead[i] = _bucket[i].head;
477        }
478
479        while (true) {
480            // At each bucket, we first determine whether there are more
481            // entries to look at in the bucket.
482            // If so, then we check whether the next entry in the
483            // bucket is in the current year. This is done simply by
484            // comparing the virtual bucket number to a running count of
485            // the year.
486            // If an entry is in the current year, then add it to the result.
487            // If no entry is in the current year, then we cycle
488            // through all buckets and find the minimum entry.
489            if (bucketHead[currentBucket] != null) {
490                // There are more entries in the bucket.
491                Object nextInBucket = bucketHead[currentBucket].contents;
492
493                while (_cqComparator
494                        .getVirtualBinNumber(nextInBucket) == virtualBucket) {
495                    // The entry is in the current year. Return it.
496                    result[index] = nextInBucket;
497                    index++;
498
499                    if (index == limit) {
500                        return result;
501                    }
502
503                    bucketHead[currentBucket] = bucketHead[currentBucket].next;
504
505                    if (bucketHead[currentBucket] == null) {
506                        break;
507                    }
508
509                    nextInBucket = bucketHead[currentBucket].contents;
510                }
511
512                long nextVirtualBucket = _cqComparator
513                        .getVirtualBinNumber(nextInBucket);
514
515                // Note that getVirtualBinNumber can return Long.MAX_VALUE
516                // which means the we should probably check to see if
517                // nextVirtualBucket <= minimumNextVirtualBucket.  However,
518                // this will cause problems if the getVirtualBinNumber()
519                // returns something other than Long.MAX_VALUE that
520                // is the same as minimumNextVirtualBucket.
521                if (nextVirtualBucket < minimumNextVirtualBucket
522                        || nextVirtualBucket == Long.MAX_VALUE) {
523                    foundValue = true;
524                    minimumNextVirtualBucket = nextVirtualBucket;
525                    nextStartBucket = currentBucket;
526                }
527            }
528
529            // Prepare to check the next bucket
530            ++currentBucket;
531            ++virtualBucket;
532
533            if (currentBucket == _numberOfBuckets) {
534                currentBucket = 0;
535            }
536
537            // If one round of search has elapsed,
538            // then increment the virtual bucket.
539            if (currentBucket == nextStartBucket) {
540                if (!foundValue) {
541                    throw new InternalErrorException(
542                            "Queue is empty, but size() is not zero! It is: "
543                                    + _queueSize);
544                }
545
546                virtualBucket = minimumNextVirtualBucket;
547                foundValue = false;
548                minimumNextVirtualBucket = Long.MAX_VALUE;
549            }
550        }
551    }
552
553    ///////////////////////////////////////////////////////////////////
554    ////                         private methods                   ////
555    // Collect an entry for later use to recalculate bin widths.
556    // The entry is collected only if it is strictly greater than the
557    // previously collected entry.
558    private void _collect(Object entry) {
559        if (_previousTakenEntry == null
560                || _cqComparator.compare(entry, _previousTakenEntry) > 0) {
561            _sampleEntries[_sampleEntryIndex++] = entry;
562
563            if (_sampleEntryIndex == _SAMPLE_SIZE) {
564                _sampleEntryIndex = 0;
565                _sampleValid = true;
566            }
567
568            _previousTakenEntry = entry;
569        }
570    }
571
572    /** Send a debug message to all debug listeners that have registered.
573     *  By convention, messages should not include a newline at the end.
574     *  The newline will be added by the listener, if appropriate.
575     *  @param message The message.
576     */
577    private final void _debug(String message) {
578        if (_debugListeners == null || !_debugging) {
579            return;
580        } else {
581            Iterator listeners = _debugListeners.iterator();
582
583            while (listeners.hasNext()) {
584                ((DebugListener) listeners.next()).message(message);
585            }
586        }
587    }
588
589    // Get the virtual bin index of the entry, and map it into
590    // a physical bin index.
591    private int _getBinIndex(Object entry) {
592        long i = _cqComparator.getVirtualBinNumber(entry);
593        i = i & _numberOfBucketsMask;
594        return (int) i;
595    }
596
597    /** Get the first entry from the specified bucket and return
598     *  its contents.
599     */
600    private Object _getFromBucket(int index) {
601        return _bucket[index].head.contents;
602    }
603
604    /** Get the index of the minimum bucket. The index is cached. When
605     *  this method is invoked, this method first checks whether the queue
606     *  has changed, if so, find a new index, otherwise, return the cached
607     *  index.
608     *  @return The index of the minimum bucket.
609     */
610    private int _getIndexOfMinimumBucket() {
611        // First check whether the queue is empty.
612        if (_queueSize == 0) {
613            throw new InvalidStateException("Queue is empty.");
614        }
615
616        // If the queue has not been changed, do nothing.
617        // Note that this has a strong dependency on the implementation
618        // of methods that modify the queue, such as the put(), remove(),
619        // and take() methods.
620        if (_indexOfMinimumBucketValid) {
621            return _indexOfMinimumBucket;
622        }
623
624        // Search the buckets starting from the index given by _minBucket.
625        // This is analogous to starting from the current page(month)
626        // of the calendar.
627        int i = _minBucket;
628
629        // Search the buckets starting from the index given by _minBucket.
630        // This is analogous to starting from the current page(month)
631        // of the calendar.
632        int j = 0;
633        int indexOfMinimum = i;
634        Object minSoFar = null;
635
636        while (true) {
637            // At each bucket, we first determine whether the bucket is empty.
638            // If not, then we check whether the first entry in the
639            // bucket is in the current year. This is done simply by
640            // comparing the virtual bucket number to _minVirtualBucket.
641            // If an entry is in the current year, then the entry is the
642            // minimum one and record its index.
643            // If no entry is in the current year, then we cycle
644            // through all buckets and find the minimum entry.
645            if (!_bucket[i].isEmpty()) {
646                // The bucket is not empty.
647                Object minimumInBucket = _bucket[i].head.contents;
648
649                if (_cqComparator.getVirtualBinNumber(
650                        minimumInBucket) == _minVirtualBucket + j) {
651                    // The entry is in the current year. Record its index.
652                    _indexOfMinimumBucket = i;
653                    break;
654                } else {
655                    // The entry is not in the current year.
656                    // Compare to the minimum found so far.
657                    if (minSoFar == null) {
658                        minSoFar = minimumInBucket;
659                        indexOfMinimum = i;
660                    } else if (_cqComparator.compare(minimumInBucket,
661                            minSoFar) < 0) {
662                        minSoFar = minimumInBucket;
663                        indexOfMinimum = i;
664                    }
665                }
666            }
667
668            // Prepare to check the next bucket
669            ++i;
670            ++j;
671
672            if (i == _numberOfBuckets) {
673                i = 0;
674            }
675
676            // If one round of search has already elapsed,
677            // then use the index of minimum one we have found so far
678            // as the index for the minimum bucket.
679            if (i == _minBucket) {
680                if (minSoFar == null) {
681                    throw new InternalErrorException(
682                            "Queue is empty, but size() is not zero!");
683                }
684                _indexOfMinimumBucket = indexOfMinimum;
685                break;
686            }
687        }
688
689        _indexOfMinimumBucketValid = true;
690        return _indexOfMinimumBucket;
691    }
692
693    // Initialize the bucket array to the specified number of buckets
694    // with the specified width.
695    //
696    // @param logNumberOfBuckets The number of buckets, given as a power of 2.
697    // @param firstEntry: First entry of the new queue.
698    private void _localInit(int logNumberOfBuckets, Object firstEntry) {
699        _logNumberOfBuckets = logNumberOfBuckets;
700        _numberOfBuckets = 1 << logNumberOfBuckets;
701        _numberOfBucketsMask = _numberOfBuckets - 1;
702
703        int numberOfBuckets = 1 << _logNumberOfBuckets;
704        _bucket = new CQLinkedList[numberOfBuckets];
705
706        for (int i = 0; i < numberOfBuckets; ++i) {
707            // Initialize each bucket with an empty CQLinkedList
708            // that uses _cqComparator for sorting.
709            _bucket[i] = new CQLinkedList();
710        }
711
712        // Set the initial position in queue.
713        _minimumEntry = firstEntry;
714        _minVirtualBucket = _cqComparator.getVirtualBinNumber(firstEntry);
715        _minBucket = _getBinIndex(firstEntry);
716
717        // Set the queue size change thresholds.
718        _bottomThreshold = _numberOfBuckets >> _logQueueBinCountFactor;
719        _topThreshold = _numberOfBuckets << _logQueueBinCountFactor;
720        _queueSizeOverThreshold = _queueSizeUnderThreshold = 0;
721        _initialized = true;
722    }
723
724    // Check to see whether the queue needs resizing, and if it does, do it.
725    // Copy the queue into a new queue with the specified number of buckets.
726    // Unlike Randy Brown's realization, a new queue is reallocated
727    // each time the number of buckets changes.
728    // The resize methods follows these steps:
729    // 1. A new bin width is computed as a function of the entry statistics.
730    // 2. Create the new queue and initialize it.
731    // 3. Transfer all elements from the old queue into the new queue.
732    // The argument indicates whether the queue just increased or decreased
733    // in size.
734    private void _resize(boolean increasing) {
735        // This method is called whenever the queue is modified. So, we
736        // set the flat _queueChanged to true.
737        _indexOfMinimumBucketValid = false;
738
739        if (!_resizeEnabled) {
740            return;
741        }
742
743        int logNewSize = _logNumberOfBuckets;
744        boolean resize = false;
745
746        if (increasing) {
747            if (_queueSize > _topThreshold) {
748                _queueSizeOverThreshold++;
749            }
750
751            if (_queueSizeOverThreshold > _RESIZE_LAG) {
752                resize = true;
753                _queueSizeOverThreshold = 0;
754                logNewSize = _logNumberOfBuckets + _logQueueBinCountFactor;
755
756                if (_debugging) {
757                    _debug(">>>>>> increasing number of buckets to: "
758                            + (1 << logNewSize));
759                }
760            }
761        } else {
762            // Queue has just decreased in size.
763            if (_queueSize < _bottomThreshold) {
764                _queueSizeUnderThreshold++;
765            }
766
767            if (_queueSizeUnderThreshold > _RESIZE_LAG) {
768                resize = true;
769                _queueSizeUnderThreshold = 0;
770
771                // If it is already minimum or close, do nothing.
772                int tempLogNewSize = _logNumberOfBuckets
773                        - _logQueueBinCountFactor;
774
775                if (tempLogNewSize > _logMinNumBuckets) {
776                    logNewSize = tempLogNewSize;
777
778                    if (_debugging) {
779                        _debug(">>>>>> decreasing number of buckets to: "
780                                + (1 << logNewSize));
781                    }
782                }
783            }
784        }
785
786        if (!resize) {
787            return;
788        }
789
790        // Find new bucket width, if appropriate.
791        if (_sampleValid) {
792            // Have to copy samples into a new array to ensure that they
793            // increasing...
794            Object[] sampleCopy = new Object[_SAMPLE_SIZE];
795
796            for (int i = 0; i < _SAMPLE_SIZE; i++) {
797                sampleCopy[i] = _sampleEntries[_sampleEntryIndex++];
798
799                if (_sampleEntryIndex == _SAMPLE_SIZE) {
800                    _sampleEntryIndex = 0;
801                }
802            }
803
804            _cqComparator.setBinWidth(sampleCopy);
805
806            if (_debugging) {
807                _debug(">>> changing bin width.");
808            }
809        }
810
811        // Save location and size of the old calendar.
812        CQLinkedList[] old_bucket = _bucket;
813        int old_numberOfBuckets = _numberOfBuckets;
814
815        // Initialize new calendar.
816        _localInit(logNewSize, _minimumEntry);
817        _queueSize = 0;
818
819        // Go through each of the old buckets and add its elements
820        // to the new queue. Disable debugging to not report these puts.
821        boolean saveDebugging = _debugging;
822        _debugging = false;
823
824        boolean saveResizeEnabled = _resizeEnabled;
825        _resizeEnabled = false;
826
827        for (int i = 0; i < old_numberOfBuckets; i++) {
828            while (!old_bucket[i].isEmpty()) {
829                put(old_bucket[i].take());
830            }
831        }
832
833        _debugging = saveDebugging;
834        _resizeEnabled = saveResizeEnabled;
835    }
836
837    // Take the first entry from the specified bucket and return
838    // its value.
839    private Object _takeFromBucket(int index) {
840        Object minEntry = _bucket[index].take();
841
842        // Update the position on the calendar.
843        _minBucket = index;
844        _minimumEntry = minEntry;
845        _minVirtualBucket = _cqComparator.getVirtualBinNumber(minEntry);
846        --_queueSize;
847
848        // Reduce the calendar size if needed.
849        _resize(false);
850
851        // Return the item found.
852        return minEntry;
853    }
854
855    ///////////////////////////////////////////////////////////////////
856    ////                         private inner class               ////
857    // Specialized sorted list of objects.  This class is used
858    // instead of Java's built-in collections in order to provide the
859    // following features:
860    //   - multiset behavior.  An entry can appear more than once.
861    //   - monitoring of buckets in order to trigger reallocation.
862    //   - monitoring of time stamps in order to help set bin width.
863    // The class is very specialized, with many features of generic
864    // collections missing.
865    //
866    private class CQLinkedList {
867        // Construct an empty list.
868        public CQLinkedList() {
869            head = null;
870            tail = null;
871        }
872
873        public final boolean includes(Object object) {
874            if (isEmpty()) {
875                return false;
876            }
877
878            return head.find(object) != null;
879        }
880
881        public final boolean isEmpty() {
882            return head == null;
883        }
884
885        // Insert the specified entry into the list, in sorted position.
886        // If there are already objects of the same key, then this one
887        // is positioned after those objects.
888        public final void insert(Object object) {
889            // Special case: linked list is empty.
890            if (head == null) {
891                head = new CQCell(object, null);
892                tail = head;
893                return;
894            }
895
896            // LinkedList is not empty.
897            // I assert that by construction, when head != null,
898            // then tail != null as well.
899            // Special case: Check if object is greater than or equal to tail.
900            if (_cqComparator.compare(object, tail.contents) >= 0) {
901                // object becomes new tail.
902                CQCell newTail = new CQCell(object, null);
903                tail.next = newTail;
904                tail = newTail;
905                return;
906            }
907
908            // Check if head is strictly greater than object.
909            if (_cqComparator.compare(head.contents, object) > 0) {
910                // object becomes the new head
911                head = new CQCell(object, head);
912                return;
913            }
914
915            // No more special cases.
916            // Iterate from head of queue.
917            CQCell previousCell = head;
918            CQCell currentCell = previousCell.next;
919
920            // Note that this loop will always terminate via the return
921            // statement. This is because tail is assured of being strictly
922            // greater than object.
923            do {
924                // check if currentCell is strictly greater than object
925                if (_cqComparator.compare(currentCell.contents, object) > 0) {
926                    // insert object between previousCell and currentCell
927                    CQCell newCell = new CQCell(object, currentCell);
928                    previousCell.next = newCell;
929                    return;
930                }
931
932                previousCell = currentCell;
933                currentCell = previousCell.next;
934            } while (currentCell != null);
935        }
936
937        // Remove the specified element from the queue, where equals() is used
938        // to determine a match.  Only the first matching element that is found
939        // is removed. Return true if a matching element is found and removed,
940        // and false otherwise.
941        public final boolean remove(Object object) {
942            // two special cases:
943            // Case 1: list is empty: always return false.
944            if (head == null) {
945                return false;
946            }
947
948            // Case 2: The element I want is at head of the list.
949            if (head.contents.equals(object)) {
950                if (head != tail) {
951                    // Linked list has at least two cells.
952                    head = head.next;
953                } else {
954                    // Linked list contains only one cell
955                    head = null;
956                    tail = null;
957                }
958
959                return true;
960            }
961
962            // Non-special case that requires looping:
963            CQCell previousCell = head;
964            CQCell currentCell = previousCell.next;
965            if (currentCell == null) {
966                return false;
967            }
968
969            do {
970                if (currentCell.contents != null
971                        && currentCell.contents.equals(object)) {
972                    // Found a match.
973                    if (tail == currentCell) {
974                        // Removing the tail. Need to update.
975                        tail = previousCell;
976                    }
977
978                    previousCell.next = currentCell.next;
979                    return true;
980                }
981
982                previousCell = currentCell;
983                currentCell = currentCell.next;
984            } while (currentCell != null);
985
986            // No matching entry was found.
987            return false;
988        }
989
990        // Remove and return the first element in the list.
991        public final Object take() {
992            // remove the head
993            CQCell oldHead = head;
994            head = head.next;
995
996            if (head == null) {
997                tail = null;
998            }
999
1000            return oldHead.contents;
1001        }
1002
1003        // The head of the list.
1004        public CQCell head;
1005
1006        // The tail of the list.
1007        public CQCell tail;
1008    }
1009
1010    /** Simplified and specialized linked list cell.  This is based on
1011     * Doug Lea's implementation in collections, but with most of the
1012     * functionality stripped out.
1013     */
1014    private static class CQCell {
1015        // FindBugs suggests making this class static so as to decrease
1016        // the size of instances and avoid dangling references.
1017
1018        /** Construct a cell with the specified contents, pointing to the
1019         * specified next cell.
1020         * @param contents The contents.
1021         * @param next The next cell, or null if this is the end of the list.
1022         */
1023        public CQCell(Object contents, CQCell next) {
1024            this.contents = contents;
1025            this.next = next;
1026        }
1027
1028        /** Search the list for the specified element (using equals() to
1029         * identify a match).  Note that this does a linear search
1030         * starting at the beginning of the list.
1031         * @param element Element to look for.
1032         * @return The cell containing the element, or null if there is none.
1033         */
1034        public final CQCell find(Object element) {
1035            for (CQCell p = this; p != null; p = p.next) {
1036                if (p.contents.equals(element)) {
1037                    return p;
1038                }
1039            }
1040
1041            return null;
1042        }
1043
1044        /** The contents of the cell. */
1045        public Object contents;
1046
1047        /** The next cell in the list, or null if there is none. */
1048        public CQCell next;
1049    }
1050
1051    ///////////////////////////////////////////////////////////////////
1052    ////                         private variables                 ////
1053
1054    /** @serial The list of DebugListeners registered with this object. */
1055    private LinkedList _debugListeners = null;
1056
1057    /** @serial A flag indicating whether there are debug listeners. */
1058    private boolean _debugging;
1059
1060    // The number of times that the queue has exceeded the current
1061    // threshold.
1062    private int _queueSizeOverThreshold = 0;
1063
1064    // The number of times that the queue has dropped below the current
1065    // threshold.
1066    private int _queueSizeUnderThreshold = 0;
1067
1068    // The smallest queue size before number of buckets gets decreased.
1069    private int _bottomThreshold;
1070
1071    // The array of bins or buckets.
1072    private CQLinkedList[] _bucket;
1073
1074    // Cached minimum bucket.
1075    private Object _cachedMinimumBucket;
1076
1077    // Comparator to determine how to order entries.
1078    private CQComparator _cqComparator;
1079
1080    /** The index of the minmum bucket. */
1081    private int _indexOfMinimumBucket = 0;
1082
1083    /** Flag indicating whether the index of the minimum bucket is valid.
1084     */
1085    private boolean _indexOfMinimumBucketValid = false;
1086
1087    // An indicator of whether the queue has been initialized.
1088    private boolean _initialized = false;
1089
1090    // The log base 2 of the number of buckets to start with and the lower bound
1091    // on the number of buckets. That is, the number of buckets is 2
1092    // raised to this power.
1093    private int _logMinNumBuckets = 1;
1094
1095    // The number of buckets in the queue as a power of two.
1096    private int _logNumberOfBuckets;
1097
1098    // The factor by which to multiply (or divide)
1099    // the number of bins to when resizing, given as a power of two.
1100    private int _logQueueBinCountFactor = 1;
1101
1102    // The minimum entry in the queue.
1103    // All entries in the queue have greater keys (or equal).
1104    private Object _minimumEntry = null;
1105
1106    // The quantized value of _minimumEntry (the virtual bin number).
1107    private long _minVirtualBucket;
1108
1109    // The positive modulo of _minimumEntry (the physical bin number).
1110    private int _minBucket;
1111
1112    // The number of buckets currently in the queue (see _logNumberOfBuckets).
1113    private int _numberOfBuckets;
1114
1115    // The mask to use for modulo division by the number
1116    // of buckets currently in the queue (see _logNumberOfBuckets).
1117    private int _numberOfBucketsMask;
1118
1119    // The most recently collected entry.
1120    private Object _previousTakenEntry = null;
1121
1122    // The current number of elements in the queue.
1123    private int _queueSize = 0;
1124
1125    // Flag for enable/disable changing the number of bins.
1126    private boolean _resizeEnabled = true;
1127
1128    // The number of times a threshold must be exceeded to trigger resizing.
1129    private final static int _RESIZE_LAG = 32;
1130
1131    // Number of entries to calculate bin width.
1132    private final static int _SAMPLE_SIZE = 8;
1133
1134    // Sample entries to use to calculate bin width.
1135    private Object[] _sampleEntries = new Object[_SAMPLE_SIZE];
1136
1137    // Index into the sample entries array.
1138    private int _sampleEntryIndex = 0;
1139
1140    // Indicator of whether there are enough sample entries to calculate width.
1141    private boolean _sampleValid = false;
1142
1143    // The largest queue size before number of buckets gets increased.
1144    private int _topThreshold;
1145}