Reconnecting JMS listener

Problem

In some projects we still may need to manually to reconnect to our JMS provider. For what ever reason the framework or the container cannot do the job for us. As so we have to ensure that an once registered JMS listener re-register itself if something bad happens.

Note: Check first if your container can handle this for you.

Solution

Well some JmsConnectionFactory already support a re-connection but they are not always very reliable. For instance, the IBM driver supports in theory a reconnect, but it is just a bit buggy.

Well, what is the straight forward solution? The most easiest way is just to register an exception listener with the connection and rebuild the connection from here:

 connection.setExceptionListener((e) -> {
   connection = cf.createConnection();
   session = connection.createSession();
   consumer = session.createConsumer(session.createQueue("FANCY.QUEUE.NAME"));
   consumer.setMessageListener((m) -> {
     System.out.println("onMessage: " + m);
   }
   connection.start();
 });

Not clean but in theory, this is all that we have to do. Of course, we have to handle the state, the old connection and even worse think about a retry if the connection cannot be created immediately again. The last part considering that we have of course also the requirement to stop the connection and avoid race conditions is the hard part.

ReconnectinListener

Let us build a ReconnectingListener step by step. What we have seen above is that we need a way to determine the desired connection state and we have to provide separate methods for connect and disconnect as we will do this more often, in case we have to retry to re-connect again.

Connecting and Disconnecting

Lets us first solve the connect and disconnect problem. The user should be able to:

  1. Have a separate connect method
  2. Store the user desire if we should be connected or not
  3. Have a separate disconnect method
  4. Have a way to clear the resources without to reset the user desire
  5. Have a way to check if the connection is up and running
@RequiredArgsConstructor
static class ReconnectingListener implements MessageListener,  ExceptionListener {
    final JmsConnectionFactory cf;
    final String destinationQueue;
    
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    // 1. Have a separate connect method
    public synchronized void connect() throws JMSException {
        // 2. Store the user desire if we should be connected or not
        shouldRun.set(true);
        if (!isConnected()) {
            connection = cf.createConnection();
            session = connection.createSession();
            consumer = session.createConsumer(session.createQueue(destinationQueue));
            consumer.setMessageListener(this); // register us
            connection.setExceptionListener(this); // react on errors
            connection.start();
        }
    }
    // 3. Have a separate disconnect method 
    public synchronized void disconnect() {
        // 2. Store the user desire if we should be connected or not
        shouldRun.set(false);
        clearConnection();
    }
    // 4. Have a way to clear the resources without to reset the user desire
    private synchronized void clearConnection() {
        JmsUtil.close(consumer);
        JmsUtil.close(session);
        JmsUtil.close(connection);
        consumer = null;
        session = null;
        connection = null;
        LOG.debug("Connection cleared and stoped");
   }
   // 5. Have a way to check if the connection is up and running
   public boolean isConnected() {
       return connection != null && consumer != null && session != null;
   }

Reconnect „onException“

Having solved this we have to re-connect back to the JMS Broker if we lose the connection. For this have to implement the ExceptionListener interface as shown above and now to implement onException. The goal here is now:

  1. Clear the old connection to avoid a double registration
  2. Schedule a delayed retry to connect again
  3. Check if we still should be connected
  4. Retry again, if we fail in our retry to connect to the broker
// we need a executer to delay a retry attempt
private final ScheduledExecutorService reconnectScheduler = Executors.newScheduledThreadPool(1);
@Override
public void onException(JMSException exception) {
    // 1. Clear the old connection to avoid a double registration
    clearConnection();
    // 2. Schedule a delayed retry to connect again
    reconnect(5);
}
private void reconnect(final int delay) {            
    reconnectScheduler.schedule(() -> {
        // 3. Check if we still should be connected
        if (shouldRun.get()) {
            try {
                // here we use our connect method from above
                connect();
            } catch (Exception e) {
                // 4. Retry again, if we fail in our retry to connect to the broker
                LOG.info("Reconnect failed, will retry in {}s. {}", delay, e.getMessage());
                clearConnection(); // just for savety
                reconnect(delay);
            }
        }
    }, delay, TimeUnit.SECONDS);
}

That’s it. Basically we solved all basic requirements

Going further

We should of course provide the ability to set a subscription filter and maybe we should just implement the Closeable interface to provide a more modern API. Not to forget that we have now hardcoded that we subscribe always to a queue, means a way to tell the code if it is a topic or not would be too bad. Maybe we should also consider using the JMS 2.x API like:

public synchronized void connectJms2() throws JMSException {
    shouldRun.set(true);
    if (!isConnected()) {
        context = cf.createContext();
        context.setExceptionListener(this);
        context.setAutoStart(true);
        consumer = context.createConsumer(context.createQueue(destinationQueue));
        consumer.setMessageListener(this); // register us
    }
}

Last but not least we could also destroy the executor. Looking forward to a pull request :-).

Links

Paul Sterl has written 52 articles

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>