Auto-repaired connections with RabbitMQ

Here is a short article about making RabbitMQ’s Java clients automatically repair their connections. Indeed, sometimes, network issues may result in broken connections.

Since version 3.3.0, default Java clients for RabbitMQ have options for this situation. Almost everything is in the factory that creates the channel and consumers.

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername( messageServerUsername );
factory.setPassword( messageServerPassword );

// Timeout for connection establishment: 5s
factory.setConnectionTimeout( 5000 );

// Configure automatic reconnections
factory.setAutomaticRecoveryEnabled( true );

// Recovery interval: 10s
factory.setNetworkRecoveryInterval( 10000 );

// Exchanges and so on should be redeclared if necessary
factory.setTopologyRecoveryEnabled( true );

When an established connection gets broken, the client will automatically try to recover everything, including queues, exchanges and bindings. Please, refer to the user guide for more details. From my experience, the only part that can be a problem is about consumers.

Indeed, in my code, I used to rely on QueueingConsumers. This class is perfectly working, except it is deprecated and that it breaks connection recovery. So, if you are using it and you want connection recovery, you MUST replace it by your own consumer.

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;

import net.roboconf.core.utils.Utils;
import net.roboconf.messaging.api.messages.Message;
import net.roboconf.messaging.api.utils.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

/**
 * Notice: QueueingConsumer is deprecated, hence this implementation that supports recovery.
 */
public class MyConsumer extends DefaultConsumer implements Consumer {

	private final Logger logger = Logger.getLogger( getClass().getName());


	/**
	 * Constructor.
	 * @param channel
	 */
	public MyConsumer( Channel channel ) {
		super( channel );
	}


	@Override
	public void handleDelivery( String consumerTag, Envelope envelope, BasicProperties properties, byte[] body )
	throws IOException {

		// Do what you have to do with your message.
		// Prefer a short processing...
	}


	@Override
	public void handleShutdownSignal( String consumerTag, ShutdownSignalException sig ) {

		if( sig.isInitiatedByApplication()) {
			this.logger.fine( "The connection to the messaging server was shut down." + id( consumerTag ));

		} else if( sig.getReference() instanceof Channel ) {
			int nb = ((Channel) sig.getReference()).getChannelNumber();
			this.logger.fine( "A RabbitMQ consumer was shut down. Channel #" + nb + ", " + id( consumerTag ));

		} else {
			this.logger.fine( "A RabbitMQ consumer was shut down." + id( consumerTag ));
		}
	}


	@Override
	public void handleCancelOk( String consumerTag ) {
		this.logger.fine( "A RabbitMQ consumer stops listening to new messages." + id( consumerTag ));
	}


	@Override
	public void handleCancel( String consumerTag ) throws IOException {
		this.logger.fine( "A RabbitMQ consumer UNEXPECTABLY stops listening to new messages." + id( consumerTag ));
	}


	/**
	 * @param consumerTag a consumer tag
	 * @return a readable ID of this consumer
	 */
	private String id( String consumerTag ) {

		StringBuilder sb = new StringBuilder();
		sb.append( " Consumer tag = " );
		sb.append( consumerTag );
		sb.append( ")" );

		return sb.toString();
	}
}

Before testing for real, and to be able to debug efficiently such code, you should add a recovery listener on your connections.

Channel channel = factory.newConnection().createChannel();

// Add a recoverable listener (when broken connections are recovered).
// Given the way the RabbitMQ factory is configured, the channel should be "recoverable".
((Recoverable) this.channel).addRecoveryListener( new MyRecoveryListener());

And a logging listener…

import java.util.logging.Logger;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;

public class MyRecoveryListener implements RecoveryListener {

	private final Logger logger = Logger.getLogger( getClass().getName());


	@Override
	public void handleRecovery( Recoverable recoverable ) {

		if( recoverable instanceof Channel ) {
			int channelNumber = ((Channel) recoverable).getChannelNumber();
			this.logger.fine( "Connection to channel #" + channelNumber + " was recovered." );
		}
	}
}

Now, you can test recovery for real.
Start your RabbitMQ server and then your client. Verify the connection is established. Then, turn off RabbitMQ and check your logs, the connection should appear as broken. Turn the server on. Wait few seconds and verify the connection was recovered.

If you do not want to turn off RabbitMQ, you can also play with your firewall (e.g. iptables) and disabled temporarily connections to the server. Both cases work with the code above.

Notice that connection recovery only works when a connection was successfully established. If you start the RabbitMQ server AFTER your client, recovery will not work. You may then consider using the Lyra project.


About this entry