Auto-repaired connections with RabbitMQ

Here is a short article about making RabbitMQ’s Java clients automatically repair their connections. Indeed, sometimes, network issues may result in broken connections.

Since version 3.3.0, default Java clients for RabbitMQ have options for this situation. Almost everything is in the factory that creates the channel and consumers.

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername( messageServerUsername );
factory.setPassword( messageServerPassword );

// Timeout for connection establishment: 5s
factory.setConnectionTimeout( 5000 );

// Configure automatic reconnections
factory.setAutomaticRecoveryEnabled( true );

// Recovery interval: 10s
factory.setNetworkRecoveryInterval( 10000 );

// Exchanges and so on should be redeclared if necessary
factory.setTopologyRecoveryEnabled( true );

When an established connection gets broken, the client will automatically try to recover everything, including queues, exchanges and bindings. Please, refer to the user guide for more details. From my experience, the only part that can be a problem is about consumers.

Indeed, in my code, I used to rely on QueueingConsumers. This class is perfectly working, except it is deprecated and that it breaks connection recovery. So, if you are using it and you want connection recovery, you MUST replace it by your own consumer.

import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;

import net.roboconf.core.utils.Utils;
import net.roboconf.messaging.api.messages.Message;
import net.roboconf.messaging.api.utils.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

 * Notice: QueueingConsumer is deprecated, hence this implementation that supports recovery.
public class MyConsumer extends DefaultConsumer implements Consumer {

	private final Logger logger = Logger.getLogger( getClass().getName());

	 * Constructor.
	 * @param channel
	public MyConsumer( Channel channel ) {
		super( channel );

	public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body )
	throws IOException {

		// Do what you have to do with your message.
		// Prefer a short processing...

	public void handleShutdownSignal( String consumerTag, ShutdownSignalException sig ) {

		if( sig.isInitiatedByApplication()) {
			this.logger.fine( "The connection to the messaging server was shut down." + id( consumerTag ));

		} else if( sig.getReference() instanceof Channel ) {
			int nb = ((Channel) sig.getReference()).getChannelNumber();
			this.logger.fine( "A RabbitMQ consumer was shut down. Channel #" + nb + ", " + id( consumerTag ));

		} else {
			this.logger.fine( "A RabbitMQ consumer was shut down." + id( consumerTag ));

	public void handleCancelOk( String consumerTag ) {
		this.logger.fine( "A RabbitMQ consumer stops listening to new messages." + id( consumerTag ));

	public void handleCancel( String consumerTag ) throws IOException {
		this.logger.fine( "A RabbitMQ consumer UNEXPECTABLY stops listening to new messages." + id( consumerTag ));

	 * @param consumerTag a consumer tag
	 * @return a readable ID of this consumer
	private String id( String consumerTag ) {

		StringBuilder sb = new StringBuilder();
		sb.append( " Consumer tag = " );
		sb.append( consumerTag );
		sb.append( ")" );

		return sb.toString();

Before testing for real, and to be able to debug efficiently such code, you should add a recovery listener on your connections.

Channel channel = factory.newConnection().createChannel();

// Add a recoverable listener (when broken connections are recovered).
// Given the way the RabbitMQ factory is configured, the channel should be "recoverable".
((Recoverable) new MyRecoveryListener());

And a logging listener…

import java.util.logging.Logger;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;

public class MyRecoveryListener implements RecoveryListener {

	private final Logger logger = Logger.getLogger( getClass().getName());

	public void handleRecovery( Recoverable recoverable ) {

		if( recoverable instanceof Channel ) {
			int channelNumber = ((Channel) recoverable).getChannelNumber();
			this.logger.fine( "Connection to channel #" + channelNumber + " was recovered." );

Now, you can test recovery for real.
Start your RabbitMQ server and then your client. Verify the connection is established. Then, turn off RabbitMQ and check your logs, the connection should appear as broken. Turn the server on. Wait few seconds and verify the connection was recovered.

If you do not want to turn off RabbitMQ, you can also play with your firewall (e.g. iptables) and disabled temporarily connections to the server. Both cases work with the code above.

Notice that connection recovery only works when a connection was successfully established. If you start the RabbitMQ server AFTER your client, recovery will not work. You may then consider using the Lyra project.

5 thoughts on “Auto-repaired connections with RabbitMQ

  1. Very nice, it worked well for me!

    I have a major problem with the lack of documentation around this recovery process. I’m going to have to dig around to figure out how long RabbitMQ will attempt to recover (or set a value if it never stops) and make some appropriate actions when this time limit is reached.

    Thanks again!

  2. I see there is some “drift” in the API, some of the method signatures change.
    Also shouldn’t there be a mention of the actual construction of the Consumer somewhere… eg.
    consumer = new MyConsumer(channel);

    1. Indeed, I did not explain how to create the consumer.
      I assumed people would already know before wondering how to make their connections more robust. I can update the post. šŸ™‚

    1. I have to admit I have no idea.
      If there is no built-in mechanism, you might want to check what happens when a recover attempt fails. I think the recovery listener is not invoked. But maybe the shutdown one is. Anyway, I think this is complicated. Recovery mechanisms aim at making your client more robust, with the idea that it will be back working at some moment (when the network is back, when the server is back…).

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s