PAGE UNDER CONSTRUCTION
Introduction
This article describes how to implement Java jobs to communicate over a Message Queue (MQ). This includes one Java job for publishing and one for subscribing and receiving.
The document explains which classes have to be created and for which purpose. This example was developed with the use of Apache MQ.
Goal of the Example
The example covers the specifics to achieve the use case. It neither covers the complete job api nor the complete possibilities of JMS.
For the coverage of the JobScheduler job api refer to the XML part of the API Documentation.
For the coverage of the JMS api refer to the Oracle Java Documentation.
The Use Case
The use case consists of the following steps:
- Run a Java Job in the JobScheduler which sends an XML fragment to a message queue (MQ)
- Run a Java Job in the JobScheduler which receives an XML fragment from an MQ
- Execute an XML command in the JobScheduler
What Does This Example Explain?
The steps described in this article are as follows:
- How to implement a Java job using the job api
- How to implement the Producer Job
- How to implement the Consumer Job
- How to execute an XML fragment using the job api
- How to deploy the jobs to the JobScheduler
- How to configure the JobScheduler Jobs with JOE using the developed classes
Prerequisites
To write a Java job for the JobScheduler the following dependency is needed.
- engine-job-api.jar
- The library is hosted on Maven Central. Please download the jar and add it to the classpath of the Java project.
- If the Java project already is a maven project, simply add the following dependency to the project configuration.
<dependency>
<groupId>com.sos-berlin.jobscheduler.engine</groupId>
<artifactId>engine-job-api</artifactId>
<version>1.10.3</version>
</dependency>
- Make sure to use the correct version suitable for the JobScheduler in use.
For this example the activemq-all-5.13.0.jar library is used.
- Either download the jar file and add it to the classpath
- or in case of a maven project add the following dependency to the project configuration
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.0</version>
</dependency>
The Basic Structure of an API Job Java Class
A Java Job using the JobScheduler API has to extend the Job_impl class.
It has to overwrite the method spooler_process()
.
public class CLASSNAME extends Job_impl {
@Override
public boolean spooler_process() throws Exception {
// This is the place where the work is done!
return spooler_job.order_queue() != null;
}
}
The Return Value of spooler_process()
If the Java job is an order job it has to return true
in case no error occurs for the order to be able to continue with the next task.
If the Java job is a standalone job it has to return false for the JobScheduler to be able to recognize the task has finished.
This can be achieved by using the return value spooler_job.order_queue() != null
. It determines if an order is in the queue of the job (true
) or not (false
).
Logging
Because the Java Jobs run API methods, we can directly use the logging feature of the JobScheduler. As shown in the detailed method examples below, logging is done by using the spooler_log
method.
Make sure to throw the catched exception in order to hand it over the spooler_process()
method. This is needed for a job running in a job chain to determine that it has to fail in case an error occurs.
Initialization of the MQ Connection
Both Java jobs need some methods to initialize a connection to an MQ Server. This section of the document shows how to implement them. The implementation is based on the JMS implementation used and shipped by activeMQ.
To keep it simple for this example these methods are put in both Java job classes. To prevent duplicates in a real project´s source code, it is better to put these methods in one class and extend the class in the two job classes.
The createConnection()
method
This method instantiates a ConnectionFactory
object with an ActiveMQConnectionFactory
object and creates a Connection
object through the factory method createConnection()
.
The ActiveMQConnectionFactory
object has to be instantiated with the URL of the MQ server.
public Connection createConnection(String uri){
factory = new ActiveMQConnectionFactory(uri);
Connection jmsConnection = null;
try {
jmsConnection = factory.createConnection();
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to connect: ");
throw e;
}
return jmsConnection;
}
The createSession(Connection connection)
method
The method is called with an already instantiated Connection
object and instantiates a Session
object through the Connection
object´s createSession(boolean transacted, int acknowledgeMode
)
method.
The acknowledgeMode has the following options among others:
In summary the acknowledge mode AUTO_ACKNOWLEDGE results in the message be dequeued when one consumer has read the message from the MQ server, whereas the CLIENT_ACKNOWLEDGE puts the responsibility on the client.
When the acknowledge mode CLIENT_ACKNOWLEDGE is set, the message will stay present for all consumers to read until a consumer acknowledges the message. Only then the message will be dequeued and is not available for further consumers.
private Session createSession(Connection connection) throws JMSException{
Session session = null;
try {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create Session: ");
throw e;
}
return session;
}
The createDestination(Session session, String queueName)
method
This method creates a Destination
object. It is called with an active Session
object and the name of the queue to write to. The Destination
object is initiated through the createQueue(String name)
method of the Session
object.
private Destination createDestination(Session session, String queueName) throws JMSException{
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create Destination: ");
throw e;
}
return destination;
}
The createConnectionUrl (String protocol, String hostName, String port)
method
This is a helper method which creates a connection URI based on the job parameters. It simply uses a StringBuilder to create a string in the format [protocol]://[hostname]:[port]
.
private String createConnectionUrl (String protocol, String hostName, String port){
StringBuilder strb = new StringBuilder();
strb.append(protocol).append("://").append(hostName).append(":").append(port);
return strb.toString();
}
Implementation of the Producer Job
There are three more methods needed for the producer Java job to run.
createMessageProducer(Session session, Destination destination)
write(String text, String connectionUrl, String queueName)
execute()
The c
reateMessageProducer(Session session, Destination destination)
method
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageProducer object with the given session and destination.
public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
throw e;
}
return producer;
}
The write(String text, String connectionUrl, String queueName)
method
The method is called with the message to send to the server and the MessageProducer
object to use for publishing. The message is a String
object. The method instantiates a Message
object with the text to send. This method makes use of the methods described above. It instantiates all objects needed for the connection and sends a Message
object. Also the methods throws an error if one occurs it still calls the MessageProducer´s send(Message message)
in a try..catch..finally
block. That is because the use of the finally
block to close the connection in the end.
public void write(String text, String connectionUrl, String queueName) throws JMSException{
Connection connection = createConnection(connectionUrl);
Session session = createSession(connection);
Destination destination = createDestination(session, queueName);
MessageProducer producer = createMessageProducer(session, destination);
Message message = null;
try {
message = session.createTextMessage(text);
producer.send(message);
} catch (JMSException e) {
spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
throw e;
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
throw e;
}
}
}
}
The execute()
method
This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method and subsequently calls the write(..) method to send the message to the MQ server.
private void execute() throws Exception {
Variable_set params = spooler_task.params();
params.merge(spooler_task.order().params());
spooler_log.debug9(params.xml());
String protocol = spooler_task.params().value("MQ_Protocol");
spooler_log.debug9("Received protocol: " + protocol);
String messageHost = spooler_task.params().value("MQ_Host");
spooler_log.debug9("Received MQ Host: " + messageHost);
String messagePort = spooler_task.params().value("MQ_Port");
spooler_log.debug9("Received MQ port: " + messagePort);
String queueName = spooler_task.params().value("MQ_QueueName");
spooler_log.debug9("Received Queue name: " + queueName);
String message = spooler_task.params().value("message");
spooler_log.debug9("Received message: " + message);
if(protocol == null || (protocol != null && protocol.isEmpty())){
protocol = DEFAULT_PROTOCOL;
}
if(queueName == null || (queueName != null && queueName.isEmpty())){
queueName = DEFAULT_QUEUE_NAME;
}
String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
if(message != null && !message.isEmpty()){
write(message, connectionUrl, queueName);
} else {
spooler_log.error("Message is empty, nothing to send to message server");
}
}
Putting the Bricks Together
The last thing to do is to call the execute()
method in spooler_process()
as shown in the complete code example below.
package com.sos.jitl.messaging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import sos.spooler.Job_impl;
import sos.spooler.Variable_set;
public class MessageProducerExampleJob extends Job_impl {
private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
private static final String DEFAULT_PROTOCOL = "tcp";
@Override
public boolean spooler_process() throws Exception {
try {
execute();
} catch (Exception e) {
spooler_log.error("Error occured in spooler_process() of MessageProducerExampleJob: ");
throw e;
}
return spooler_job.order_queue() != null;
}
private void execute() throws Exception {
Variable_set params = spooler_task.params();
params.merge(spooler_task.order().params());
spooler_log.debug9(params.xml());
String protocol = spooler_task.params().value("MQ_Protocol");
spooler_log.debug9("Received protocol: " + protocol);
String messageHost = spooler_task.params().value("MQ_Host");
spooler_log.debug9("Received MQ Host: " + messageHost);
String messagePort = spooler_task.params().value("MQ_Port");
spooler_log.debug9("Received MQ port: " + messagePort);
String queueName = spooler_task.params().value("MQ_QueueName");
spooler_log.debug9("Received Queue name: " + queueName);
String message = spooler_task.params().value("message");
spooler_log.debug9("Received message: " + message);
if(protocol == null || (protocol != null && protocol.isEmpty())){
protocol = DEFAULT_PROTOCOL;
}
if(queueName == null || (queueName != null && queueName.isEmpty())){
queueName = DEFAULT_QUEUE_NAME;
}
String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
if(message != null && !message.isEmpty()){
write(message, connectionUrl, queueName);
} else {
spooler_log.error("Message is empty, nothing to send to message server");
}
}
private String createConnectionUrl (String protocol, String hostName, String port){
StringBuilder strb = new StringBuilder();
strb.append(protocol).append("://").append(hostName).append(":").append(port);
return strb.toString();
}
private Connection createConnection(String uri) throws JMSException{
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection jmsConnection = null;
try {
jmsConnection = factory.createConnection();
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to connect: ");
throw e;
}
return jmsConnection;
}
private Session createSession(Connection connection) throws JMSException{
Session session = null;
try {
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create Session: ");
throw e;
}
return session;
}
private Destination createDestination(Session session, String queueName) throws JMSException{
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create Destination: ");
throw e;
}
return destination;
}
public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
throw e;
}
return producer;
}
public void write(String text, String connectionUrl, String queueName) throws JMSException{
Connection connection = createConnection(connectionUrl);
Session session = createSession(connection);
Destination destination = createDestination(session, queueName);
MessageProducer producer = createMessageProducer(session, destination);
Message message = null;
try {
message = session.createTextMessage(text);
producer.send(message);
} catch (JMSException e) {
spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
throw e;
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
throw e;
}
}
}
}
}
Implementation of the Consumer Job
As seen above in the Producer Job example, there are also three more methods needed for the consumer Java job to run.
createMessageConsumer(Session session, Destination destination)
read(Connection connection, String queueName, Boolean closeMessage)
execute()
The c
reateMessageConsumer(Session session, Destination destination)
method
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageConsumer
object with the given session and destination.
private MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to create MessageConsumer: ");
throw e;
}
return consumer;
}
The read(Connection connection, String queueName, Boolean closeMessage)
method
The method is called with an already instantiated Connection
object the name of the queue to read from and a flag to determine if this is the last consumer. If it is the last consumer for the messages in the specified queue then the reiceived message will be acknowledged. This method makes use of the methods described above. It instantiates all objects needed for the connection and reads a Message
object from the given queue. It extracts the value from the Message
object as a string representation via the Message
object´s getText()
method.
private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
String messageText = null;
try {
Session session = createSession(connection);
Destination destination = createDestination(session, queueName);
connection.start();
MessageConsumer consumer = createMessageConsumer(session, destination);
Message receivedMessage = null;
while (true) {
receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
TextMessage message = (TextMessage) receivedMessage;
messageText = message.getText();
spooler_log.debug9("Reading message from queue: " + messageText);
break;
} else {
break;
}
}
}
if(closeMessage){
receivedMessage.acknowledge();
}
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to read from Destination: ");
throw e;
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
spooler_log.error("JMSException occurred while trying to close the connection: ");
}
}
}
return messageText;
}
The execute()
method
This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method, calls the read(..) method and subsequently execute the XML.