Class LinkedTransferQueue<E>

java.lang.Object
java.util.AbstractCollection<E>
java.util.AbstractQueue<E>
org.glassfish.jersey.internal.util.collection.LinkedTransferQueue<E>
Type Parameters:
E - the type of elements held in this collection.
All Implemented Interfaces:
Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>, TransferQueue<E>

class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, Serializable
An unbounded TransferQueue based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.

This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a LinkedTransferQueue happen-before actions subsequent to the access or removal of that element from the LinkedTransferQueue in another thread.

This class is a member of the Java Collections Framework. TODO: Do NOT remove this class. It's referenced from DataStructures.

  • Field Details

    • serialVersionUID

      private static final long serialVersionUID
      See Also:
    • MP

      private static final boolean MP
      True if on multiprocessor
    • FRONT_SPINS

      private static final int FRONT_SPINS
      The number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking when a node is apparently the first waiter in the queue. See above for explanation. Must be a power of two. The value is empirically derived -- it works pretty well across a variety of processors, numbers of CPUs, and OSes.
      See Also:
    • CHAINED_SPINS

      private static final int CHAINED_SPINS
      The number of times to spin before blocking when a node is preceded by another node that is apparently spinning. Also serves as an increment to FRONT_SPINS on phase changes, and as base average frequency for yielding during spins. Must be a power of two.
      See Also:
    • SWEEP_THRESHOLD

      static final int SWEEP_THRESHOLD
      The maximum number of estimated removal failures (sweepVotes) to tolerate before sweeping through the queue unlinking cancelled nodes that were not unlinked upon initial removal. See above for explanation. The value must be at least two to avoid useless sweeps when removing trailing nodes.
      See Also:
    • tail

      private transient volatile LinkedTransferQueue.Node tail
      tail of the queue; null until first append
    • sweepVotes

      private transient volatile int sweepVotes
      The number of apparent failures to unsplice removed nodes
    • NOW

      private static final int NOW
      See Also:
    • ASYNC

      private static final int ASYNC
      See Also:
    • SYNC

      private static final int SYNC
      See Also:
    • TIMED

      private static final int TIMED
      See Also:
    • UNSAFE

      private static final sun.misc.Unsafe UNSAFE
    • headOffset

      private static final long headOffset
    • tailOffset

      private static final long tailOffset
    • sweepVotesOffset

      private static final long sweepVotesOffset
  • Constructor Details

    • LinkedTransferQueue

      public LinkedTransferQueue()
      Creates an initially empty LinkedTransferQueue.
    • LinkedTransferQueue

      public LinkedTransferQueue(Collection<? extends E> c)
      Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.
      Parameters:
      c - the collection of elements to initially contain
      Throws:
      NullPointerException - if the specified collection or any of its elements are null
  • Method Details

    • casTail

      private boolean casTail(LinkedTransferQueue.Node cmp, LinkedTransferQueue.Node val)
    • casHead

      private boolean casHead(LinkedTransferQueue.Node cmp, LinkedTransferQueue.Node val)
    • casSweepVotes

      private boolean casSweepVotes(int cmp, int val)
    • cast

      static <E> E cast(Object item)
    • xfer

      private E xfer(E e, boolean haveData, int how, long nanos)
      Implements all queuing methods. See above for explanation.
      Parameters:
      e - the item or null for take
      haveData - true if this is a put, else a take
      how - NOW, ASYNC, SYNC, or TIMED
      nanos - timeout in nanosecs, used only if mode is TIMED
      Returns:
      an item if matched, else e
      Throws:
      NullPointerException - if haveData mode but e is null
    • tryAppend

      private LinkedTransferQueue.Node tryAppend(LinkedTransferQueue.Node s, boolean haveData)
      Tries to append node s as tail.
      Parameters:
      s - the node to append
      haveData - true if appending in data mode
      Returns:
      null on failure due to losing race with append in different mode, else s's predecessor, or s itself if no predecessor
    • awaitMatch

      private E awaitMatch(LinkedTransferQueue.Node s, LinkedTransferQueue.Node pred, E e, boolean timed, long nanos)
      Spins/yields/blocks until node s is matched or caller gives up.
      Parameters:
      s - the waiting node
      pred - the predecessor of s, or s itself if it has no predecessor, or null if unknown (the null case does not occur in any current calls but may in possible future extensions)
      e - the comparison value for checking match
      timed - if true, wait only until timeout elapses
      nanos - timeout in nanosecs, used only if timed is true
      Returns:
      matched item, or e if unmatched on interrupt or timeout
    • spinsFor

      private static int spinsFor(LinkedTransferQueue.Node pred, boolean haveData)
      Returns spin/yield value for a node with given predecessor and data mode. See above for explanation.
    • succ

      Returns the successor of p, or the head node if p.next has been linked to self, which will only be true if traversing with a stale pointer that is now off the list.
    • firstOfMode

      private LinkedTransferQueue.Node firstOfMode(boolean isData)
      Returns the first unmatched node of the given mode, or null if none. Used by methods isEmpty, hasWaitingConsumer.
    • firstDataItem

      private E firstDataItem()
      Returns the item in the first unmatched node with isData; or null if none. Used by peek.
    • countOfMode

      private int countOfMode(boolean data)
      Traverses and counts unmatched nodes of the given mode. Used by methods size and getWaitingConsumerCount.
    • unsplice

      final void unsplice(LinkedTransferQueue.Node pred, LinkedTransferQueue.Node s)
      Unsplices (now or later) the given deleted/cancelled node with the given predecessor.
      Parameters:
      pred - a node that was at one time known to be the predecessor of s, or null or s itself if s is/was at head
      s - the node to be unspliced
    • sweep

      private void sweep()
      Unlinks matched (typically cancelled) nodes encountered in a traversal from head.
    • findAndRemove

      private boolean findAndRemove(Object e)
      Main implementation of remove(Object)
    • put

      public void put(E e)
      Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never block.
      Specified by:
      put in interface BlockingQueue<E>
      Throws:
      NullPointerException - if the specified element is null
    • offer

      public boolean offer(E e, long timeout, TimeUnit unit)
      Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never block or return false.
      Specified by:
      offer in interface BlockingQueue<E>
      Returns:
      true (as specified by BlockingQueue.offer)
      Throws:
      NullPointerException - if the specified element is null
    • offer

      public boolean offer(E e)
      Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never return false.
      Specified by:
      offer in interface BlockingQueue<E>
      Specified by:
      offer in interface Queue<E>
      Returns:
      true (as specified by Queue.offer(E))
      Throws:
      NullPointerException - if the specified element is null
    • add

      public boolean add(E e)
      Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never throw IllegalStateException or return false.
      Specified by:
      add in interface BlockingQueue<E>
      Specified by:
      add in interface Collection<E>
      Specified by:
      add in interface Queue<E>
      Overrides:
      add in class AbstractQueue<E>
      Returns:
      true (as specified by Collection.add(E))
      Throws:
      NullPointerException - if the specified element is null
    • tryTransfer

      public boolean tryTransfer(E e)
      Transfers the element to a waiting consumer immediately, if possible.

      More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in take() or timed poll), otherwise returning false without enqueuing the element.

      Specified by:
      tryTransfer in interface TransferQueue<E>
      Parameters:
      e - the element to transfer
      Returns:
      true if the element was transferred, else false
      Throws:
      NullPointerException - if the specified element is null
    • transfer

      public void transfer(E e) throws InterruptedException
      Transfers the element to a consumer, waiting if necessary to do so.

      More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in take() or timed poll), else inserts the specified element at the tail of this queue and waits until the element is received by a consumer.

      Specified by:
      transfer in interface TransferQueue<E>
      Parameters:
      e - the element to transfer
      Throws:
      NullPointerException - if the specified element is null
      InterruptedException - if interrupted while waiting, in which case the element is not enqueued.
    • tryTransfer

      public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException
      Transfers the element to a consumer if it is possible to do so before the timeout elapses.

      More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in take() or timed poll), else inserts the specified element at the tail of this queue and waits until the element is received by a consumer, returning false if the specified wait time elapses before the element can be transferred.

      Specified by:
      tryTransfer in interface TransferQueue<E>
      Parameters:
      e - the element to transfer
      timeout - how long to wait before giving up, in units of unit
      unit - a TimeUnit determining how to interpret the timeout parameter
      Returns:
      true if successful, or false if the specified waiting time elapses before completion, in which case the element is not enqueued.
      Throws:
      NullPointerException - if the specified element is null
      InterruptedException - if interrupted while waiting, in which case the element is not enqueued.
    • take

      public E take() throws InterruptedException
      Specified by:
      take in interface BlockingQueue<E>
      Throws:
      InterruptedException
    • poll

      public E poll(long timeout, TimeUnit unit) throws InterruptedException
      Specified by:
      poll in interface BlockingQueue<E>
      Throws:
      InterruptedException
    • poll

      public E poll()
      Specified by:
      poll in interface Queue<E>
    • drainTo

      public int drainTo(Collection<? super E> c)
      Specified by:
      drainTo in interface BlockingQueue<E>
    • drainTo

      public int drainTo(Collection<? super E> c, int maxElements)
      Specified by:
      drainTo in interface BlockingQueue<E>
    • iterator

      public Iterator<E> iterator()
      Returns an iterator over the elements in this queue in proper sequence. The elements will be returned in order from first (head) to last (tail).

      The returned iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

      Specified by:
      iterator in interface Collection<E>
      Specified by:
      iterator in interface Iterable<E>
      Specified by:
      iterator in class AbstractCollection<E>
      Returns:
      an iterator over the elements in this queue in proper sequence
    • peek

      public E peek()
      Specified by:
      peek in interface Queue<E>
    • isEmpty

      public boolean isEmpty()
      Returns true if this queue contains no elements.
      Specified by:
      isEmpty in interface Collection<E>
      Overrides:
      isEmpty in class AbstractCollection<E>
      Returns:
      true if this queue contains no elements
    • hasWaitingConsumer

      public boolean hasWaitingConsumer()
      Description copied from interface: TransferQueue
      Returns true if there is at least one consumer waiting to dequeue an element via take or poll. The return value represents a momentary state of affairs.
      Specified by:
      hasWaitingConsumer in interface TransferQueue<E>
      Returns:
      true if there is at least one waiting consumer
    • size

      public int size()
      Returns the number of elements in this queue. If this queue contains more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.

      Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal.

      Specified by:
      size in interface Collection<E>
      Specified by:
      size in class AbstractCollection<E>
      Returns:
      the number of elements in this queue
    • getWaitingConsumerCount

      public int getWaitingConsumerCount()
      Description copied from interface: TransferQueue
      Returns an estimate of the number of consumers waiting to dequeue elements via take or poll. The return value is an approximation of a momentary state of affairs, that may be inaccurate if consumers have completed or given up waiting. The value may be useful for monitoring and heuristics, but not for synchronization control. Implementations of this method are likely to be noticeably slower than those for TransferQueue.hasWaitingConsumer().
      Specified by:
      getWaitingConsumerCount in interface TransferQueue<E>
      Returns:
      the number of consumers waiting to dequeue elements
    • remove

      public boolean remove(Object o)
      Removes a single instance of the specified element from this queue, if it is present. More formally, removes an element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue contained the specified element (or equivalently, if this queue changed as a result of the call).
      Specified by:
      remove in interface BlockingQueue<E>
      Specified by:
      remove in interface Collection<E>
      Overrides:
      remove in class AbstractCollection<E>
      Parameters:
      o - element to be removed from this queue, if present
      Returns:
      true if this queue changed as a result of the call
    • contains

      public boolean contains(Object o)
      Returns true if this queue contains the specified element. More formally, returns true if and only if this queue contains at least one element e such that o.equals(e).
      Specified by:
      contains in interface BlockingQueue<E>
      Specified by:
      contains in interface Collection<E>
      Overrides:
      contains in class AbstractCollection<E>
      Parameters:
      o - object to be checked for containment in this queue
      Returns:
      true if this queue contains the specified element
    • remainingCapacity

      public int remainingCapacity()
      Always returns Integer.MAX_VALUE because a LinkedTransferQueue is not capacity constrained.
      Specified by:
      remainingCapacity in interface BlockingQueue<E>
      Returns:
      Integer.MAX_VALUE (as specified by BlockingQueue.remainingCapacity)
    • writeObject

      private void writeObject(ObjectOutputStream s) throws IOException
      Saves the state to a stream (that is, serializes it).
      Parameters:
      s - the stream
      Throws:
      IOException
    • readObject

      private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException
      Reconstitutes the Queue instance from a stream (that is, deserializes it).
      Parameters:
      s - the stream
      Throws:
      IOException
      ClassNotFoundException
    • getUnsafe

      static sun.misc.Unsafe getUnsafe()
      Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. Replace with a simple call to Unsafe.getUnsafe when integrating into a jdk.
      Returns:
      a sun.misc.Unsafe