Monday, 30 March 2015

How to interrupt a message-driven bean

So, say you have started an asynchronous task running in a Java Enterprise (JEE) container. This is often done using the interface MessageListener and its method onMessage(Message) along with Topics. In brief, jobs are announced on a Topic (that is a list of jobs) where one or more listeners subscribe and check and every time a new job arrives they do some work. A Topic is like a TODO list while a MessageListener offers a thread while will undertake the announced job. However, while some thread executes such a job, we may decide that we want to stop the execution. In this article we will explain how such a thread can be interrupted.

Quoting the Oracle documentation page [ref]:

A message-driven bean is an enterprise bean that allows Java EE applications to process messages asynchronously. This type of bean normally acts as a JMS message listener, which is similar to an event listener but receives JMS messages instead of events. The messages can be sent by any Java EE component (an application client, another enterprise bean, or a web component) or by a JMS application or system that does not use Java EE technology. Message-driven beans can process JMS messages or other kinds of messages.

Message-driven beans are the aforementioned subscribers to Topics. Once a new topic is published on the Topics list, an available listener will dash to serve the to-do item.

The key question here is who is running our task? (that is, which thread) and the second question is, how to access the corresponding thread so as to interrupt it. In Java, even when we know the thread ID, it is difficult to find and access the corresponding thread. We, therefore, need to construct a Map<String, Thread> to keep track of the threads which are created as message-driven beans.

 @ApplicationScoped  
 public class ThreadReference {  
   private final Map<String, Thread> THREAD_MAP;  
   public ThreadReference() {  
     this.THREAD_MAP = new HashMap<>();  
   }  
   public Map<String, Thread> getThreadReferenceMap() {  
     return THREAD_MAP;  
   }  
 }  

We, however, need to wary so as to remove stale entries from the above map. Once a message-driven bean starts running, it needs to register itself (its thread) to the above thread map. The key in the above map is an identifier of the asynchronous task and the value is the corresponding thread.

 public abstract class RunningTaskMDB implements MessageListener {  
   @Inject  
   ThreadReference threadMap;  
   public void init(String taskId) {  
     // Task started - updated the task reference map  
     if (taskId != null) {  
       threadMap.getThreadReferenceMap().put(taskId, Thread.currentThread());  
     }  
   }  
   public void terminate(String taskId) {  
     // remove the task from the thread reference map  
     if (taskId != null) {  
       threadMap.getThreadReferenceMap().remove(taskId);  
     }  
   }  
   public RunningTaskMDB() {  
   }  
 }  

Note the two methods: init and terminate. These need to be invoked by any class that extends RunningTaskMDB. Note that if init is called for some taskId, then terminate MUST be also invoked. For that reason, I recommend to put terminate in a finally block.

 @Override  
   public void onMessage(Message msg) {  
   String taskId = "xz342";  
   try {  
    init(taskId);  
    do_more_things_here();
   } catch (Throwable e) {  
    // handle the exception  
   } finally {  
     terminate  
   }  
 }  

It is then very easy to interrupt a task from anywhere in the application: find the thread by ID using the thread reference, interrupt it and don't forget to remove it from the thread repository...

 Thread thread = threadMap.getThreadReferenceMap().get(id);  
 if (thread != null) {  
  thread.interrupt();  
  threadMap.getThreadReferenceMap().remove(id);  
 }  



1 comment:

  1. Hi Pantelis,

    Thanks for the article, i was looking out for the same.
    There are two issues with using this in actual practice though.

    1. Interruption does nothing on its own unless i check isinterrupted() in my thread code periodically and take some cleanup/exit actions on that. Say for example my thread iterates on 100 data items and perform some execution on them, i can check in each iteration for interrupted flag and stop execution if its set. But thats not the usual case, my code might have one single not iterative piece of heavy work. So there is no way to break that kind of execution.
    Same problem with transaction handling, unless i check the flag i am unable to throw any kind of interruptedexception(which in turn is required to rollback my transaction).

    ReplyDelete