Quoting the Oracle documentation page [ref]:
A message-driven bean is an enterprise bean that allows Java EE applications to process
messages asynchronously. This type of bean normally acts as a JMS message listener,
which is similar to an event listener but receives JMS messages instead of
events. The messages can be sent by any Java EE component (an
application client, another enterprise bean, or a web component) or by a JMS
application or system that does not use Java EE technology. Message-driven beans can
process JMS messages or other kinds of messages.
Message-driven beans are the aforementioned subscribers to Topics. Once a new topic is published on the Topics list, an available listener will dash to serve the to-do item.
The key question here is who is running our task? (that is, which thread) and the second question is, how to access the corresponding thread so as to interrupt it. In Java, even when we know the thread ID, it is difficult to find and access the corresponding thread. We, therefore, need to construct a Map<String, Thread> to keep track of the threads which are created as message-driven beans.
@ApplicationScoped
public class ThreadReference {
private final Map<String, Thread> THREAD_MAP;
public ThreadReference() {
this.THREAD_MAP = new HashMap<>();
}
public Map<String, Thread> getThreadReferenceMap() {
return THREAD_MAP;
}
}
We, however, need to wary so as to remove stale entries from the above map. Once a message-driven bean starts running, it needs to register itself (its thread) to the above thread map. The key in the above map is an identifier of the asynchronous task and the value is the corresponding thread.
public abstract class RunningTaskMDB implements MessageListener {
@Inject
ThreadReference threadMap;
public void init(String taskId) {
// Task started - updated the task reference map
if (taskId != null) {
threadMap.getThreadReferenceMap().put(taskId, Thread.currentThread());
}
}
public void terminate(String taskId) {
// remove the task from the thread reference map
if (taskId != null) {
threadMap.getThreadReferenceMap().remove(taskId);
}
}
public RunningTaskMDB() {
}
}
Note the two methods: init and terminate. These need to be invoked by any class that extends RunningTaskMDB. Note that if init is called for some taskId, then terminate MUST be also invoked. For that reason, I recommend to put terminate in a finally block.
@Override
public void onMessage(Message msg) {
String taskId = "xz342";
try {
init(taskId);
do_more_things_here();
} catch (Throwable e) {
// handle the exception
} finally {
terminate
}
}
It is then very easy to interrupt a task from anywhere in the application: find the thread by ID using the thread reference, interrupt it and don't forget to remove it from the thread repository...
Thread thread = threadMap.getThreadReferenceMap().get(id);
if (thread != null) {
thread.interrupt();
threadMap.getThreadReferenceMap().remove(id);
}
Hi Pantelis,
ReplyDeleteThanks for the article, i was looking out for the same.
There are two issues with using this in actual practice though.
1. Interruption does nothing on its own unless i check isinterrupted() in my thread code periodically and take some cleanup/exit actions on that. Say for example my thread iterates on 100 data items and perform some execution on them, i can check in each iteration for interrupted flag and stop execution if its set. But thats not the usual case, my code might have one single not iterative piece of heavy work. So there is no way to break that kind of execution.
Same problem with transaction handling, unless i check the flag i am unable to throw any kind of interruptedexception(which in turn is required to rollback my transaction).