Class AMQConnection

All Implemented Interfaces:
Connection, NetworkConnection, ShutdownNotifier, Closeable, AutoCloseable
Direct Known Subclasses:
RecoveryAwareAMQConnection

public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection
Concrete class representing and managing an AMQP connection to a broker.

To create a broker connection, use ConnectionFactory. See Connection for an example.

  • Field Details

    • MAX_UNSIGNED_SHORT

      private static final int MAX_UNSIGNED_SHORT
      See Also:
    • LOGGER

      private static final org.slf4j.Logger LOGGER
    • CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER

      public static final double CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER
      See Also:
    • consumerWorkServiceExecutor

      private final ExecutorService consumerWorkServiceExecutor
    • heartbeatExecutor

      private final ScheduledExecutorService heartbeatExecutor
    • shutdownExecutor

      private final ExecutorService shutdownExecutor
    • mainLoopThread

      private Thread mainLoopThread
    • ioLoopThreadSet

      private final AtomicBoolean ioLoopThreadSet
    • ioLoopThread

      private volatile Thread ioLoopThread
    • threadFactory

      private ThreadFactory threadFactory
    • id

      private String id
    • recoveryCanBeginListeners

      private final List<RecoveryCanBeginListener> recoveryCanBeginListeners
    • errorOnWriteListener

      private final ErrorOnWriteListener errorOnWriteListener
    • workPoolTimeout

      private final int workPoolTimeout
    • finalShutdownStarted

      private final AtomicBoolean finalShutdownStarted
    • connectionInfo

      private volatile ObservationCollector.ConnectionInfo connectionInfo
    • clientVersion

      private static final Version clientVersion
    • _channel0

      private final AMQChannel _channel0
      The special channel 0 (not managed by the _channelManager)
    • _workService

      protected ConsumerWorkService _workService
    • _frameHandler

      private final FrameHandler _frameHandler
      Frame source/sink
    • _running

      private volatile boolean _running
      Flag controlling the main driver loop's termination
    • _exceptionHandler

      private final ExceptionHandler _exceptionHandler
      Handler for (uncaught) exceptions that crop up in the AMQConnection.MainLoop.
    • _appContinuation

      private final BlockingCell<Object> _appContinuation
      Object used for blocking main application thread when doing all the necessary connection shutdown operations
    • _brokerInitiatedShutdown

      private volatile boolean _brokerInitiatedShutdown
      Flag indicating whether the client received Connection.Close message from the broker
    • _inConnectionNegotiation

      private volatile boolean _inConnectionNegotiation
      Flag indicating we are still negotiating the connection in start
    • _heartbeatSender

      private HeartbeatSender _heartbeatSender
      Manages heart-beat sending for this connection
    • _virtualHost

      private final String _virtualHost
    • _clientProperties

      private final Map<String,Object> _clientProperties
    • saslConfig

      private final SaslConfig saslConfig
    • requestedHeartbeat

      private final int requestedHeartbeat
    • requestedChannelMax

      private final int requestedChannelMax
    • requestedFrameMax

      private final int requestedFrameMax
    • handshakeTimeout

      private final int handshakeTimeout
    • shutdownTimeout

      private final int shutdownTimeout
    • credentialsProvider

      private final CredentialsProvider credentialsProvider
    • blockedListeners

      private final Collection<BlockedListener> blockedListeners
    • metricsCollector

      protected final MetricsCollector metricsCollector
    • observationCollector

      protected final ObservationCollector observationCollector
    • channelRpcTimeout

      private final int channelRpcTimeout
    • channelShouldCheckRpcResponseType

      private final boolean channelShouldCheckRpcResponseType
    • trafficListener

      private final TrafficListener trafficListener
    • credentialsRefreshService

      private final CredentialsRefreshService credentialsRefreshService
    • _frameMax

      private volatile int _frameMax
      Maximum frame length, or zero if no limit is set
    • _missedHeartbeats

      private volatile int _missedHeartbeats
      Count of socket-timeouts that have happened without any incoming frames
    • _heartbeat

      private volatile int _heartbeat
      Currently-configured heart-beat interval, in seconds. 0 meaning none.
    • _channelManager

      private volatile ChannelManager _channelManager
      Object that manages a set of channels
    • _serverProperties

      private volatile Map<String,Object> _serverProperties
      Saved server properties field from connection.start
    • maxInboundMessageBodySize

      private final int maxInboundMessageBodySize
    • SOCKET_CLOSE_TIMEOUT

      private static long SOCKET_CLOSE_TIMEOUT
  • Constructor Details

  • Method Details

    • defaultClientProperties

      public static Map<String,Object> defaultClientProperties()
      Retrieve a copy of the default table of client properties that will be sent to the server during connection startup. This method is called when each new ConnectionFactory instance is constructed.
      Returns:
      a map of client properties
      See Also:
    • disconnectChannel

      public final void disconnectChannel(ChannelN channel)
      Protected API - respond, in the driver thread, to a ShutdownSignal.
      Parameters:
      channel - the channel to disconnect
    • ensureIsOpen

      private void ensureIsOpen() throws AlreadyClosedException
      Throws:
      AlreadyClosedException
    • getAddress

      public InetAddress getAddress()
      Retrieve the host.
      Specified by:
      getAddress in interface Connection
      Specified by:
      getAddress in interface NetworkConnection
      Returns:
      the hostname of the peer we're connected to.
    • getLocalAddress

      public InetAddress getLocalAddress()
      Description copied from interface: NetworkConnection
      Retrieve the local host.
      Specified by:
      getLocalAddress in interface NetworkConnection
      Returns:
      the client socket address.
    • getPort

      public int getPort()
      Retrieve the port number.
      Specified by:
      getPort in interface Connection
      Specified by:
      getPort in interface NetworkConnection
      Returns:
      the port number of the peer we're connected to.
    • getLocalPort

      public int getLocalPort()
      Description copied from interface: NetworkConnection
      Retrieve the local port number.
      Specified by:
      getLocalPort in interface NetworkConnection
      Returns:
      the client socket port number
    • getFrameHandler

      public FrameHandler getFrameHandler()
    • getServerProperties

      public Map<String,Object> getServerProperties()
      Retrieve the server properties.
      Specified by:
      getServerProperties in interface Connection
      Returns:
      a map of the server properties. This typically includes the product name and version of the server.
    • createChannel0

      AMQChannel createChannel0()
    • initializeConsumerWorkService

      private void initializeConsumerWorkService()
    • initializeHeartbeatSender

      private void initializeHeartbeatSender()
    • start

      public void start() throws IOException, TimeoutException
      Start up the connection, including the MainLoop thread. Sends the protocol version negotiation header, and runs through Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then calls Connection.Open and waits for the OpenOk. Sets heart-beat and frame max values after tuning has taken place.
      Throws:
      IOException - if an error is encountered either before, or during, protocol negotiation; sub-classes ProtocolVersionMismatchException and PossibleAuthenticationFailureException will be thrown in the corresponding circumstances. AuthenticationFailureException will be thrown if the broker closes the connection with ACCESS_REFUSED. If an exception is thrown, connection resources allocated can all be garbage collected when the connection object is no longer referenced.
      TimeoutException
    • instantiateChannelManager

      protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory)
    • configureChannelManager

      protected void configureChannelManager(ChannelManager channelManager)
    • startMainLoop

      public void startMainLoop()
      Package private API, allows for easier testing.
    • negotiateChannelMax

      protected int negotiateChannelMax(int requestedChannelMax, int serverMax)
      Private API, allows for easier simulation of bogus clients.
    • checkPreconditions

      private static void checkPreconditions()
      Private API - check required preconditions and protocol invariants
    • getChannelMax

      public int getChannelMax()
      Get the negotiated maximum channel number. Usable channel numbers range from 1 to this number, inclusive.
      Specified by:
      getChannelMax in interface Connection
      Returns:
      the maximum channel number permitted for this connection.
    • getFrameMax

      public int getFrameMax()
      Get the negotiated maximum frame size.
      Specified by:
      getFrameMax in interface Connection
      Returns:
      the maximum frame size, in octets; zero if unlimited
    • getHeartbeat

      public int getHeartbeat()
      Get the negotiated heartbeat interval.
      Specified by:
      getHeartbeat in interface Connection
      Returns:
      the heartbeat interval, in seconds; zero if none
    • setHeartbeat

      public void setHeartbeat(int heartbeat)
      Protected API - set the heartbeat timeout. Should only be called during tuning.
    • setThreadFactory

      public void setThreadFactory(ThreadFactory threadFactory)
      Makes it possible to override thread factory that is used to instantiate connection network I/O loop. Only necessary in the environments with restricted
      Parameters:
      threadFactory - thread factory to use
    • getThreadFactory

      public ThreadFactory getThreadFactory()
      Returns:
      Thread factory used by this connection.
    • getClientProperties

      public Map<String,Object> getClientProperties()
      Description copied from interface: Connection
      Get a copy of the map of client properties sent to the server
      Specified by:
      getClientProperties in interface Connection
      Returns:
      a copy of the map of client properties
    • getClientProvidedName

      public String getClientProvidedName()
      Description copied from interface: Connection
      Returns client-provided connection name, if any. Note that the value returned does not uniquely identify a connection and cannot be used as a connection identifier in HTTP API requests.
      Specified by:
      getClientProvidedName in interface Connection
      Returns:
      client-provided connection name, if any
      See Also:
    • getExceptionHandler

      public ExceptionHandler getExceptionHandler()
      Protected API - retrieve the current ExceptionHandler
      Specified by:
      getExceptionHandler in interface Connection
      See Also:
    • willShutDownConsumerExecutor

      public boolean willShutDownConsumerExecutor()
      Public API
      Returns:
      true if this work service instance uses its own consumerWorkServiceExecutor (as opposed to a shared one)
    • createChannel

      public Channel createChannel(int channelNumber) throws IOException
      Public API - Create a new channel, using the specified channel number if possible.

      Use Connection.openChannel(int) if you want to use an Optional to deal with a value.

      Specified by:
      createChannel in interface Connection
      Parameters:
      channelNumber - the channel number to allocate
      Returns:
      a new channel descriptor, or null if this channel number is already in use
      Throws:
      IOException - if an I/O problem is encountered
    • createChannel

      public Channel createChannel() throws IOException
      Public API - Create a new channel, using an internally allocated channel number. If automatic connection recovery is enabled, the channel returned by this method will be Recoverable.

      Use Connection.openChannel() if you want to use an Optional to deal with a value.

      Specified by:
      createChannel in interface Connection
      Returns:
      a new channel descriptor, or null if none is available
      Throws:
      IOException - if an I/O problem is encountered
    • writeFrame

      public void writeFrame(Frame f) throws IOException
      Public API - sends a frame directly to the broker.
      Throws:
      IOException
    • flush

      public void flush() throws IOException
      Public API - flush the output buffers
      Throws:
      IOException
    • negotiatedMaxValue

      private static int negotiatedMaxValue(int clientValue, int serverValue)
    • checkUnsignedShort

      private static boolean checkUnsignedShort(int value)
    • handleReadFrame

      public boolean handleReadFrame(Frame frame)
      private API
    • isRunning

      public boolean isRunning()
    • hasBrokerInitiatedShutdown

      public boolean hasBrokerInitiatedShutdown()
    • readFrame

      private void readFrame(Frame frame) throws IOException
      Throws:
      IOException
    • handleHeartbeatFailure

      public void handleHeartbeatFailure()
      private API
    • handleIoError

      public void handleIoError(Throwable ex)
      private API
    • handleFailure

      private void handleFailure(Throwable ex)
    • doFinalShutdown

      public void doFinalShutdown()
      private API
    • closeMainLoopThreadIfNecessary

      private void closeMainLoopThreadIfNecessary()
    • notInMainLoopThread

      private boolean notInMainLoopThread()
    • mainLoopReadThreadNotNull

      private boolean mainLoopReadThreadNotNull()
    • notifyRecoveryCanBeginListeners

      private void notifyRecoveryCanBeginListeners()
    • addRecoveryCanBeginListener

      public void addRecoveryCanBeginListener(RecoveryCanBeginListener fn)
    • removeRecoveryCanBeginListener

      public void removeRecoveryCanBeginListener(RecoveryCanBeginListener fn)
    • handleSocketTimeout

      private void handleSocketTimeout() throws SocketTimeoutException
      Called when a frame-read operation times out
      Throws:
      MissedHeartbeatException - if heart-beats have been missed
      SocketTimeoutException
    • processControlCommand

      public boolean processControlCommand(Command c) throws IOException
      Handles incoming control commands on channel zero.
      Throws:
      IOException
      See Also:
    • handleConnectionClose

      public void handleConnectionClose(Command closeCommand)
    • shutdown

      public ShutdownSignalException shutdown(Method reason, boolean initiatedByApplication, Throwable cause, boolean notifyRpc)
      Protected API - causes all attached channels to terminate (shutdown) with a ShutdownSignal built from the argument, and stops this connection from accepting further work from the application. ShutdownListeners for the connection are notified when the main loop terminates.
      Parameters:
      reason - description of reason for the exception
      initiatedByApplication - true if caused by a client command
      cause - trigger exception which caused shutdown
      notifyRpc - true if outstanding rpc should be informed of shutdown
      Returns:
      a shutdown signal built using the given arguments
    • startShutdown

      private ShutdownSignalException startShutdown(Method reason, boolean initiatedByApplication, Throwable cause, boolean notifyRpc)
    • finishShutdown

      private void finishShutdown(ShutdownSignalException sse)
    • close

      public void close() throws IOException
      Public API - Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. Waits for all the close operations to complete.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Specified by:
      close in interface Connection
      Throws:
      IOException - if an I/O problem is encountered
    • close

      public void close(int timeout) throws IOException
      Public API - Close this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. This method behaves in a similar way as Connection.close(), with the only difference that it waits with a provided timeout for all the close operations to complete. When timeout is reached the socket is forced to close.
      Specified by:
      close in interface Connection
      Parameters:
      timeout - timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity
      Throws:
      IOException - if an I/O problem is encountered
    • close

      public void close(int closeCode, String closeMessage) throws IOException
      Public API - Close this connection and all its channels. Waits for all the close operations to complete.
      Specified by:
      close in interface Connection
      Parameters:
      closeCode - the close code (See under "Reply Codes" in the AMQP specification)
      closeMessage - a message indicating the reason for closing the connection
      Throws:
      IOException - if an I/O problem is encountered
    • close

      public void close(int closeCode, String closeMessage, int timeout) throws IOException
      Public API - Close this connection and all its channels. Waits with the given timeout for all the close operations to complete. When timeout is reached the socket is forced to close.
      Specified by:
      close in interface Connection
      Parameters:
      closeCode - the close code (See under "Reply Codes" in the AMQP specification)
      closeMessage - a message indicating the reason for closing the connection
      timeout - timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity
      Throws:
      IOException - if an I/O problem is encountered
    • abort

      public void abort()
      Public API - Abort this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. Forces the connection to close. Any encountered exceptions in the close operations are silently discarded.
      Specified by:
      abort in interface Connection
    • abort

      public void abort(int closeCode, String closeMessage)
      Public API - Abort this connection and all its channels. Forces the connection to close and waits for all the close operations to complete. Any encountered exceptions in the close operations are silently discarded.
      Specified by:
      abort in interface Connection
      Parameters:
      closeCode - the close code (See under "Reply Codes" in the AMQP specification)
      closeMessage - a message indicating the reason for closing the connection
    • abort

      public void abort(int timeout)
      Public API - Abort this connection and all its channels with the AMQP.REPLY_SUCCESS close code and message 'OK'. This method behaves in a similar way as Connection.abort(), with the only difference that it waits with a provided timeout for all the close operations to complete. When timeout is reached the socket is forced to close.
      Specified by:
      abort in interface Connection
      Parameters:
      timeout - timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity
    • abort

      public void abort(int closeCode, String closeMessage, int timeout)
      Public API - Abort this connection and all its channels. Forces the connection to close and waits with the given timeout for all the close operations to complete. When timeout is reached the socket is forced to close. Any encountered exceptions in the close operations are silently discarded.
      Specified by:
      abort in interface Connection
      Parameters:
      closeCode - the close code (See under "Reply Codes" in the AMQP specification)
      closeMessage - a message indicating the reason for closing the connection
      timeout - timeout (in milliseconds) for completing all the close-related operations, use -1 for infinity
    • close

      public void close(int closeCode, String closeMessage, boolean initiatedByApplication, Throwable cause) throws IOException
      Protected API - Delegates to the six-argument close method, passing -1 for the timeout, and false for the abort flag.
      Throws:
      IOException
    • close

      public void close(int closeCode, String closeMessage, boolean initiatedByApplication, Throwable cause, int timeout, boolean abort) throws IOException
      Protected API - Close this connection with the given code, message, source and timeout value for all the close operations to complete. Specifies if any encountered exceptions should be ignored.
      Throws:
      IOException
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • getHostAddress

      private String getHostAddress()
    • addBlockedListener

      public void addBlockedListener(BlockedListener listener)
      Description copied from interface: Connection
      Specified by:
      addBlockedListener in interface Connection
      Parameters:
      listener - the listener to add
    • addBlockedListener

      public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback)
      Description copied from interface: Connection
      Add a lambda-based BlockedListener.
      Specified by:
      addBlockedListener in interface Connection
      Parameters:
      blockedCallback - the callback when the connection is blocked
      unblockedCallback - the callback when the connection is unblocked
      Returns:
      the listener that wraps the callback
      See Also:
    • removeBlockedListener

      public boolean removeBlockedListener(BlockedListener listener)
      Description copied from interface: Connection
      Remove a BlockedListener.
      Specified by:
      removeBlockedListener in interface Connection
      Parameters:
      listener - the listener to remove
      Returns:
      true if the listener was found and removed, false otherwise
    • clearBlockedListeners

      public void clearBlockedListeners()
      Description copied from interface: Connection
      Remove all BlockedListeners.
      Specified by:
      clearBlockedListeners in interface Connection
    • getId

      public String getId()
      Public API - Returns a unique ID for this connection. This ID must be unique, otherwise some services like the metrics collector won't work properly. This ID doesn't have to be provided by the client, services that require it will be assigned automatically if not set.
      Specified by:
      getId in interface Connection
      Returns:
      unique ID for this connection.
    • setId

      public void setId(String id)
      Public API - Sets a unique ID for this connection. This ID must be unique, otherwise some services like the metrics collector won't work properly. This ID doesn't have to be provided by the client, services that require it will be assigned automatically if not set.
      Specified by:
      setId in interface Connection
    • ioLoopThread

      public void ioLoopThread(Thread thread)
    • getChannelRpcTimeout

      public int getChannelRpcTimeout()
    • willCheckRpcResponseType

      public boolean willCheckRpcResponseType()
    • getTrafficListener

      public TrafficListener getTrafficListener()
    • getMaxInboundMessageBodySize

      int getMaxInboundMessageBodySize()
    • connectionInfo