Problem
In some projects we still may need to manually to reconnect to our JMS provider. For what ever reason the framework or the container cannot do the job for us. As so we have to ensure that an once registered JMS listener re-register itself if something bad happens.
Solution
Well some JmsConnectionFactory
already support a re-connection but they are not always very reliable. For instance, the IBM driver supports in theory a reconnect, but it is just a bit buggy.
Well, what is the straight forward solution? The most easiest way is just to register an exception listener with the connection and rebuild the connection from here:
connection.setExceptionListener((e) -> { connection = cf.createConnection(); session = connection.createSession(); consumer = session.createConsumer(session.createQueue("FANCY.QUEUE.NAME")); consumer.setMessageListener((m) -> { System.out.println("onMessage: " + m); } connection.start(); });
Not clean but in theory, this is all that we have to do. Of course, we have to handle the state, the old connection and even worse think about a retry if the connection cannot be created immediately again. The last part considering that we have of course also the requirement to stop the connection and avoid race conditions is the hard part.
ReconnectinListener
Let us build a ReconnectingListener
step by step. What we have seen above is that we need a way to determine the desired connection state and we have to provide separate methods for connect
and disconnect
as we will do this more often, in case we have to retry to re-connect again.
Connecting and Disconnecting
Lets us first solve the connect and disconnect problem. The user should be able to:
- Have a separate connect method
- Store the user desire if we should be connected or not
- Have a separate disconnect method
- Have a way to clear the resources without to reset the user desire
- Have a way to check if the connection is up and running
@RequiredArgsConstructor static class ReconnectingListener implements MessageListener, ExceptionListener { final JmsConnectionFactory cf; final String destinationQueue; private Connection connection; private Session session; private MessageConsumer consumer; // 1. Have a separate connect method public synchronized void connect() throws JMSException { // 2. Store the user desire if we should be connected or not shouldRun.set(true); if (!isConnected()) { connection = cf.createConnection(); session = connection.createSession(); consumer = session.createConsumer(session.createQueue(destinationQueue)); consumer.setMessageListener(this); // register us connection.setExceptionListener(this); // react on errors connection.start(); } } // 3. Have a separate disconnect method public synchronized void disconnect() { // 2. Store the user desire if we should be connected or not shouldRun.set(false); clearConnection(); } // 4. Have a way to clear the resources without to reset the user desire private synchronized void clearConnection() { JmsUtil.close(consumer); JmsUtil.close(session); JmsUtil.close(connection); consumer = null; session = null; connection = null; LOG.debug("Connection cleared and stoped"); } // 5. Have a way to check if the connection is up and running public boolean isConnected() { return connection != null && consumer != null && session != null; }
Reconnect „onException“
Having solved this we have to re-connect back to the JMS Broker if we lose the connection. For this have to implement the ExceptionListener
interface as shown above and now to implement onException
. The goal here is now:
- Clear the old connection to avoid a double registration
- Schedule a delayed retry to connect again
- Check if we still should be connected
- Retry again, if we fail in our retry to connect to the broker
// we need a executer to delay a retry attempt private final ScheduledExecutorService reconnectScheduler = Executors.newScheduledThreadPool(1); @Override public void onException(JMSException exception) { // 1. Clear the old connection to avoid a double registration clearConnection(); // 2. Schedule a delayed retry to connect again reconnect(5); } private void reconnect(final int delay) { reconnectScheduler.schedule(() -> { // 3. Check if we still should be connected if (shouldRun.get()) { try { // here we use our connect method from above connect(); } catch (Exception e) { // 4. Retry again, if we fail in our retry to connect to the broker LOG.info("Reconnect failed, will retry in {}s. {}", delay, e.getMessage()); clearConnection(); // just for savety reconnect(delay); } } }, delay, TimeUnit.SECONDS); }
That’s it. Basically we solved all basic requirements
Going further
We should of course provide the ability to set a subscription filter and maybe we should just implement the Closeable
interface to provide a more modern API. Not to forget that we have now hardcoded that we subscribe always to a queue, means a way to tell the code if it is a topic or not would be too bad. Maybe we should also consider using the JMS 2.x API like:
public synchronized void connectJms2() throws JMSException { shouldRun.set(true); if (!isConnected()) { context = cf.createContext(); context.setExceptionListener(this); context.setAutoStart(true); consumer = context.createConsumer(context.createQueue(destinationQueue)); consumer.setMessageListener(this); // register us } }
Last but not least we could also destroy the executor. Looking forward to a pull request :-).