Tuesday, October 5, 2010

Send delayed JMS messages

Very often I had to implement feature that has to do something asynchronously in a minute, day, or at 5PM next Monday. Every time I implemented some serialization mechanism (typically based on DB) and some scheduled task that runs periodically, checks the table and runs tasks that should be executed now. Sometimes more generic tools were used. For example Quartz. What bothered me to implement such tasks using JMS that is built to perform asynchronous tasks? The reason is that JMS API does not allow sending delayed messages, i.e. messages that will not be received by subscriber or receiver immediately. Occasionally I found out that some JMS implementations have proprietary implementation for delayed messages. I decided to perform some search in Internet and aggregate this information in one place. Here is a list of the most popular JMS implementations (see wikipedia):
  • Apache ActiveMQ
  • Apache Qpid
  • FUSE Message Broker (enterprise ActiveMQ)
  • Mantaray a P2P JMS implementation
  • OpenJMS, from The OpenJMS Group
  • JBoss Messaging from JBoss
  • HornetQ from JBoss
  • JORAM, from the OW2 Consortium
  • Open Message Queue, from Sun Microsystems
  • Sun Java System Message Queue, from Sun Microsystems, supported version of Open Message Queue
  • Rabbit MQ
Due to JMS API does not define interface for delayed messages most SMS providers that support this feature implemented it using message property. You just have to say something like msg.setLongProperty(“DELAY”, delay). Some implementations require casting to specific class and invocation of proprietary method. The following table summarizes differences between implementations of different SMS providers I found.

JMS provider Implementation
Oracle AQ msg.setIntProperty(“JMS_OracleDelay”, delay);
JBoss msg.setLongProperty(“JMS_JBOSS_SCHEDULED_DELIVERY”, now + delay);
ActiveMQ msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
OpenJMS ((org.exolab.jms.message.MessageImpl)msg).setJMSXRcvTimestamp(now + delay);
BEA Weblogic queueConnection = queueConnectionFactory.createQueueConnection(); QueueSession queueSession = queueConnection.createQueueSession(true, 0); QueueSender queueSender = queueSession.createSender(queue); ObjectMessage jmsMsg = queueSession.createObjectMessage(message); //Casts queueSender to weblogic.jms.extensions.WLMessageProducer interface and set delivery time ((WLMessageProducer) queueSender).setTimeToDeliver(timeToDeliver); queueSender.send(jmsMsg);

Do we have solution for JMS providers that do not have native support of delayed message delivery? Yes, we do. I would like to suggest the following solution.
Send message to special queue. Let’s call it DELAYED_QUEUE. Add to delayed message the special properties:
  • JMS_DESTINATION that contains name of queue or topic where this message should be finally delivered.
  • DELIVERY_TIME that contains time stamp in milliseconds (now + delay).
For each enqued delayed message create scheduled task that that will run once when message should be delivered. This scheduled task will create JMS receiver with selector that looks like DELIVERY_TIME < now (where now is the timestamp), receives all expired messages and send them to real JMS destination using property JMS_DESTINATION.
Scheduled task may be implemented as resource adapter (JCA):

BootstrapContext ctx;
ctx.getWorkManager().createTimer(). schedule(new DelayedMessageTimerTask(msg), new Date(now + delay))

This is not ideal solution. It is OK for relatively small number of messages and non persisted JMS destinations. Improvements of this solution are beyond the scope of this article.

Conclusions

Most popular JMS implementations support delayed delivery of messages. Even if this feature is not supported we can always implement it using additional queue and scheduled task.
This article is published at DZone.

No comments:

Post a Comment