Introduction
This article describes the implementation for sending messages to a Java Message Queue Service:
- send and receive messages,
- add an order to a JS7 workflow by a JS7 - REST Web Service API call with a JSON body extracted from a message.
The following Message Queue Service is used in this example: https://activemq.apache.org/.
Mode of Operation
The implementation includes the following steps:
- create a message producer to send messages to a Message Queue Service,
- create a message consumer to receive messages from a Message Queue Service,
- use the message as part of a JS7 REST Web Service API call.
The message is assumed a JSON snippet that is processed by the desired JS7 REST Web Service API. This snippet must be valid JSON and compliant with requests explained with the Technical Documentation of the REST Web Service API article.
Most of the implementation work is done with the standard JMS implementation of Java. The only class from the Active MQ implementation is the ActiveMQConnectionFactory
class. It shouldn´t be too complex to change the implementation to the ConnectionFactory
of your preferred Message Queue Service.
Download
A zip file of this example as a complete maven project implementation is available for download: jms-example-js7-project.zip.
Prerequisites
- A Message Queue Service (the example makes use of Active MQ)
- A JS7 Controller, Agent and JOC Cockpit.
- Maven (required only if users want to build the example as a Maven project)
The MessageProducer
This example describes how to establish a connection to a Message Queue Service and send a message to a queue.
- create a connection,
- create a session,
- create a destination,
- create a producer,
- send a message with the producer,
- close the connection.
Methods
The steps are divided into different methods to make the implementation more readable and reusable.
createConnection(String url)
This method instantiates a ConnectionFactory
object with an ActiveMQConnectionFactory
object and creates a Connection
object through the factory method createConnection()
.
The ActiveMQConnectionFactory
object has to be instantiated with the URL of the Message Queue Service.
public Connection createConnection(String uri){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
try {
connection = factory.createConnection();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to connect: " , e);
}
return connection;
}
createSession(Connection connection)
The method is called with an already instantiated Connection
object and initiates a Session
object through the Connection
objects createSession(boolean transacted, int acknowledgeMode
)
method.
private Session createSession(Connection connection){
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: " , e);
}
return session;
}
createDestination(Session session, String queueName
)
This method creates a Destination
object. It is called with an active Session
object and the name of the queue to write to. The Destination
object is initiated through the createQueue(String name)
method of the Session
object.
public Destination createDestination(Session session, String queueName){
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Destination: " , e);
}
return destination;
}
createMessageProducer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageProducer
object with the given session and destination.
private MessageProducer createMessageProducer(Session session, Destination destination){
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
}
return producer;
}
write(String text, MessageProducer producer)
The method is called with the message to send to the server and the MessageProducer
object to use for publishing. The message is a String
object. The method instantiates a Message
object with the text to send.
public void write(String text, MessageProducer producer){
Message message = null;
try {
if(text != null){
message = session.createTextMessage(text);
} else{
message = session.createTextMessage(DEFAULT_TEXT);
}
producer.send(message);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
}
}
close()
This method makes sure that the connection will be closed after a message has been sent.
private void close(Connection connection){
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
The SOSProducer
class
The below code example slightly differs from the above examples . In the below class the write(String text)
method already uses the other methods for instantiation, therefore it requires fewer parameters and it closes the connection.
The code in the below example includes the body for JS7 /orders/add
request.
package com.sos.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSProducer {
private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
private static final String QUEUE_NAME = "test_queue";
private static final String TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
private String uri;
public SOSProducer(String uri) {
this.uri = uri;
}
private Connection createConnection(){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
try {
connection = factory.createConnection();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to connect: " , e);
}
return connection;
}
private Session createSession(Connection connection){
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: " , e);
}
return session;
}
private Destination createDestination(Session session){
Destination destination = null;
try {
destination = session.createQueue(QUEUE_NAME);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Destination: " , e);
}
return destination;
}
private MessageProducer createMessageProducer(Session session, Destination destination){
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
}
return producer;
}
public void write(String text){
Connection connection = createConnection();
Session session = createSession(connection);
Destination destination = createDestination(session);
MessageProducer producer = createMessageProducer(session, destination);
Message message = null;
try {
if(text != null){
message = session.createTextMessage(text);
} else{
message = session.createTextMessage(TEXT);
}
producer.send(message);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
}
}
The MessageConsumer
This section describes how to establish a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to establish a TCP socket connection to send the received message to a JOC Cockpit instance:
- create a connection,
- create a session,
- create a destination.
- create a consumer,
- receive a message with the consumer,
- close the (MQ) connection,
- open a TCP connection to a JOC Cockpit instance,
- send the Message to a JOC Cockpit instance,
- close the TCP connection.
This section shows examples for the last five steps as the first four are similar to the examples explained above for the instantiation of the MessageProducer
.
Methods
createMessageConsumer(Session session, Destination destination)
This method is called with an active Session
object as well as an instantiated Destination
object. It instantiates a MessageConsumer
object with the given session and destination.
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
}
return consumer;
}
read(MessageConsumer consumer
)
The method is called with an instantiated MessageConsumer
object to receive a message from the Message Queue Service. It extracts the value from the Message
object as a string representation via the Message
objects getText()
method.
private String read(MessageConsumer consumer) {
TextMessage message = null;
String textMessage = null;
try {
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
message = (TextMessage) receivedMessage;
textMessage = message.getText();
LOGGER.info("Reading message: " + textMessage);
break;
} else {
break;
}
}
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
}
return textMessage;
}
receiveFromQueue()
The receiveFromQueue()
method uses the methods described above to connect to a JOC Cockpit instance, read from a message queue, send the received message to the JOC Cockpit instance and close the session in the JOC Cockpit instance.
public String receiveFromQueue() {
String message = null;
try {
connect();
message = read();
sendRequest(message);
disconnect();
} catch (Exception e) {
LOGGER.error("Error occurred while publishing to the JOC Cockpit instance host:" + HOST + ", port:" + PORT, e);
}
return message;
}
The SOSConsumer
class
The below code example slightly differs from the examples above. In the below class the read()
method uses another method for instantiation, therefore it requires fewer parameters and it closes the connection to JOC Cockpit.
package com.sos.jms.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SOSConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
private static final String DEFAULT_QUEUE_NAME = "test_queue";
private String uri;
public SOSConsumer (String uri) {
this.uri = uri;
}
private Connection createConnection() {
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
try {
connection = factory.createConnection();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to connect: ", e);
}
return connection;
}
private Session createSession(Connection connection) {
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: ", e);
}
return session;
}
private Destination createDestination(Session session) {
return this.createDestination(session, DEFAULT_QUEUE_NAME);
}
private Destination createDestination(Session session, String queueName) {
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Destination: ", e);
}
return destination;
}
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
}
return consumer;
}
private String read() {
TextMessage message = null;
String textMessage = null;
Connection connection = createConnection();
try {
Session session = createSession(connection);
Destination destination = createDestination(session);
connection.start();
MessageConsumer consumer = createMessageConsumer(session, destination);
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
message = (TextMessage) receivedMessage;
textMessage = message.getText();
LOGGER.info("Reading message: " + textMessage);
break;
} else {
break;
}
}
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
return textMessage;
}
public String receiveFromQueue() {
return read();
}
}
Java Class with a main(String[] args) method (example JmsExecute.java)
The Java class makes use of the SOSProducer to create a message and send it to the message queue. It uses the SOSConsumer class to read the message from the queue. Additionally it creates an HTTP connection to a JS7 JOC Cockpit instance to call the /orders/add
API with the JSON body from the message received.
The example class uses the SOSRestApiClient to create the HTTP connection. The SOSRestApiClient is based on the org.apache.httpcomponents::httpclient. Users can use their own HTTP client implementation.
package com.sos.jms;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Properties;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sos.commons.httpclient.SOSRestApiClient;
import com.sos.jms.consumer.SOSConsumer;
import com.sos.jms.producer.SOSProducer;
public class JmsExecute {
private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616";
private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/";
private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\""
+ ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
private static final String DEFAULT_USERNAME = "root";
private static final String DEFAULT_PWD = "root";
private static final String API_ADD_ORDER = "orders/add";
private static final String API_LOGIN = "authentication/login";
private static final String API_LOGOUT = "authentication/logout";
private static final String ACCESS_TOKEN_HEADER = "X-Access-Token";
private static final String APPLICATION_JSON = "application/json";
private static final String CONTENT_TYPE = "Content-Type";
private static final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class);
private static String jmsServerUri = null;
private static String jocApiUri = null;
private static String controllerId = null;
private static String workflowPath = null;
private static String requestBody = null;
private static String username = null;
private static String pwd = null;
public static void main(String[] args) {
if(args[1] != null) {
readPropertiesFile(Paths.get(args[1]));
} else {
jmsServerUri = DEFAULT_JMS_URI;
jocApiUri = DEFAULT_JOC_API_URL;
requestBody = DEFAULT_JOC_API_REQUEST_BODY;
username = DEFAULT_USERNAME;
pwd = DEFAULT_PWD;
}
if("produce".equals(args[0])) {
SOSProducer producer = new SOSProducer(jmsServerUri);
producer.write(requestBody);
} else if ("consume".equals(args[0])) {
SOSConsumer consumer = new SOSConsumer(jmsServerUri);
String consumedMessage = consumer.receiveFromQueue();
SOSRestApiClient client = setupHttpClient(username, pwd);
try {
URI jocUri = URI.create(jocApiUri);
LOGGER.info("send login to: " + jocUri.resolve(API_LOGIN).toString());
String response = client.postRestService(jocUri.resolve(API_LOGIN), null);
LOGGER.info("HTTP status code: " + client.statusCode());
if (client.statusCode() == 200) {
JsonReader jsonReader = null;
String accessToken = null;
try {
jsonReader = Json.createReader(new StringReader(response));
JsonObject json = jsonReader.readObject();
accessToken = json.getString("accessToken", "");
} catch(Exception e) {
LOGGER.warn("Could not determine accessToken.");
} finally {
jsonReader.close();
}
client.addHeader(ACCESS_TOKEN_HEADER, accessToken);
client.addHeader(CONTENT_TYPE, APPLICATION_JSON);
LOGGER.info("REQUEST: " + API_ADD_ORDER);
LOGGER.info("PARAMS: " + consumedMessage);
String apiUrl = null;
if (!API_ADD_ORDER.toLowerCase().startsWith(DEFAULT_JOC_API_URL)) {
apiUrl = DEFAULT_JOC_API_URL + API_ADD_ORDER;
}
LOGGER.info("resolvedUri: " + jocUri.resolve(apiUrl).toString());
response = client.postRestService(jocUri.resolve(apiUrl), consumedMessage);
LOGGER.info("HTTP status code: " + client.statusCode());
response = client.postRestService(jocUri.resolve(API_LOGOUT), null);
LOGGER.info("HTTP status code: " + client.statusCode());
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
client.closeHttpClient();
}
}
}
private static SOSRestApiClient setupHttpClient(String username, String password) {
SOSRestApiClient client = new SOSRestApiClient();
String basicAuth = Base64.getMimeEncoder().encodeToString((username + ":" + password).getBytes());
client.setBasicAuthorization(basicAuth);
return client;
}
private static String cleanupValue(String value) {
if(value.startsWith("\"")) {
value = value.substring(1);
}
if(value.endsWith("\"")) {
value = value.substring(0, value.length() -1);
}
return value;
}
private static void readPropertiesFile (Path path) {
Properties props = new Properties();
try {
props.load(Files.newInputStream(path));
jmsServerUri = cleanupValue(props.getProperty("JMS_URI"));
jocApiUri = cleanupValue(props.getProperty("JOC_API_URL"));
controllerId = cleanupValue(props.getProperty("controllerId"));
workflowPath = cleanupValue(props.getProperty("workflowPath"));
username = cleanupValue(props.getProperty("username"));
pwd = cleanupValue(props.getProperty("password"));
requestBody = "{\"controllerId\":\"" + controllerId + "\",\"orders\":[{\"workflowPath\":\"" +
workflowPath + "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
} catch (IOException e) {
LOGGER.warn("could not read properties file, use defaults instead.");
jmsServerUri = DEFAULT_JMS_URI;
jocApiUri = DEFAULT_JOC_API_URL;
requestBody = DEFAULT_JOC_API_REQUEST_BODY;
username = DEFAULT_USERNAME;
pwd = DEFAULT_PWD;
}
}
}
Maven Configuration Example
This example shows the dependencies required to build the above example as a Maven project.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sos-berlin</groupId>
<artifactId>activeMQ-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
</dependencies>
</project>