Introduction
This document describes the implementation required for a Java Message Service queue:
- to send and receive messages
- to add an order to a JS7 workflow as an JS7 JOC API call with a JSON body extracted from a message.
A separate implementation is used to send the API call - JS7 libraries are not needed for the example.
The Apache Active MQ server is used in this example.
Mode of Operation
The implementation enables the following steps:
- Creation of a message producer to send messages to a Message Queue server (MQ server).
- Creation of a message consumer to receive messages from an MQ server.
- Sending the message to a JS7 JOC API.
The message has to be a JSON snippet to be processed by the desired JS7 JOC API. This snippet must be valid JSON and compliant with the JOC API.
Most of the implementation work is done with the standard JMS implementation of Java. The only class from the Active MQ implementation is the ActiveMQConnectionFactory
class. It shouldn´t be too complex to change the implementation to the ConnectionFactory
of your desired message queue service.
Download
A zip file of this example as a complete maven project implementation is available: jms-example-js7-project.zip.
Prerequisites
- A running Message Queue server (The example uses Apache Active MQ)
- A running JS7 Controller, Agent and JOC Cockpit.
- Maven (only needed if you want to build the example as a maven project)
The MessageProducer
This example describes how to build a connection to an MQ server and send a message to a queue on this server.
- Create a connection.
- Create a session.
- Create a destination.
- Create a producer.
- Send a message with the producer.
- Close the connection.
The Methods
The steps are separated into different methods to make the implementation more readable as well as reusable.
createConnection(String url)
This method instantiates a ConnectionFactory
object with an ActiveMQConnectionFactory
object and creates a Connection
object through the factory method createConnection()
.
The ActiveMQConnectionFactory
object has to be instantiated with the URL of the MQ server.
createSession(Connection connection)
The method is called with an already instantiated Connection
object and initiates a Session
object through the Connection
objects createSession(boolean transacted, int acknowledgeMode
)
method.
createDestination(Session session, String queueName
)
This method creates a Destination
object. It is called with an active Session
object and the name of the queue to write to. The Destination
object is initiated through the createQueue(String name)
method of the Session
object.
createMessageProducer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageProducer
object with the given session and destination.
write(String text, MessageProducer producer)
)
The method is called with the message to send to the server and the MessageProducer
object to use for publishing. The message is a String
object. The method instantiates a Message
object with the text to send.
close()
This method makes sure that the connection used will be closed after a message has been sent.
The SOSProducer
class
The code example below slightly differs from the examples above. In the class below the write(String text)
method already uses the other methods for instantiation, therefore it needs less parameters and it closes the connection itself.
The text in the example below consists of a JobScheduler add_order
XML, which can later be used to send to a JobScheduler.
The MessageConsumer
This section describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.
- Create a connection.
- Create a session.
- Create a destination.
- Create a consumer.
- Receive a message with the consumer.
- Close the (MQ) connection.
- Open a TCP connection to a JobScheduler instance.
- Send the Message to the JobScheduler
- Close the (TCP) connection.
This section shows examples for the last 5 steps as the first four are similar to the examples already shown above for the instantiation of the MessageProducer
.
The Methods
createMessageConsumer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageConsumer
object with the given session and destination.
read(MessageConsumer consumer
)
The method is called with an already instantiated MessageConsumer
object to receive a message from the MQ server. It extracts the value from the Message
object as a string representation via the Message
objects getText()
method.
Don´t forget to clean up (call close()
) after the message has been received.
receiveFromQueue()
The receiveFromQueue()
method uses the methods described above to connect to a host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.
The SOSConsumer
class
The code example below slightly differs from the examples above. In the class below the read()
method already uses the other method for instantiation, therefore it needs less parameters and it closes the connection itself.
Java Class with a main(String[] args) method (example JmsExecute.java)
The Java class uses the SOSProducer to create a message and send it to the message queue. It uses the SOSConsumer class to read the message from the queue. Additionally it creates a http connection to a JS7 JOC API to call the /orders/add API with the JSON body from the message received.
The example class uses the SOSRestApiClient to create the HTTP connection. The SOSRestApiClient is based on the org.apache.httpcomponents::httpclient. You can use your own HTTP client implementation instead.
package com.sos.jms; import java.io.StringReader; import java.net.URI; import java.util.Base64; import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sos.commons.httpclient.SOSRestApiClient; import com.sos.jms.consumer.SOSConsumer; import com.sos.jms.producer.SOSProducer; public class JmsExecute { private static final String JMS_URI = "tcp://[MESSAGE_SERVER_HOST]:61616"; private static final String JOC_API_URL = "http://[JOC_COCKPIT_HOST]:[JOC_COCKPIT_PORT]/joc/api/"; private static final String API_ADD_ORDER = "orders/add"; private static final String API_LOGIN = "authentication/login"; private static final String API_LOGOUT = "authentication/logout"; private static final String JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; private static final String ACCESS_TOKEN_HEADER = "X-Access-Token"; private static final String APPLICATION_JSON = "application/json"; private static final String CONTENT_TYPE = "Content-Type"; private static final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class); public static void main(String[] args) { if("produce".equals(args[0])) { SOSProducer producer = new SOSProducer(JMS_URI); producer.write(JOC_API_REQUEST_BODY); } else if ("consume".equals(args[0])) { SOSConsumer consumer = new SOSConsumer(JMS_URI); String consumedMessage = consumer.receiveFromQueue(); SOSRestApiClient client = setupHttpClient(); try { URI jocUri = URI.create(JOC_API_URL); LOGGER.info("send login to: " + jocUri.resolve(API_LOGIN).toString()); String response = client.postRestService(jocUri.resolve(API_LOGIN), null); LOGGER.info("HTTP status code: " + client.statusCode()); if (client.statusCode() == 200) { JsonReader jsonReader = null; String accessToken = null; try { jsonReader = Json.createReader(new StringReader(response)); JsonObject json = jsonReader.readObject(); accessToken = json.getString("accessToken", ""); } catch(Exception e) { LOGGER.warn("Could not determine accessToken."); } finally { jsonReader.close(); } client.addHeader(ACCESS_TOKEN_HEADER, accessToken); client.addHeader(CONTENT_TYPE, APPLICATION_JSON); LOGGER.info("REQUEST: " + API_ADD_ORDER); LOGGER.info("PARAMS: " + consumedMessage); String apiUrl = null; if (!API_ADD_ORDER.toLowerCase().startsWith(JOC_API_URL)) { apiUrl = JOC_API_URL + API_ADD_ORDER; } LOGGER.info("resolvedUri: " + jocUri.resolve(apiUrl).toString()); response = client.postRestService(jocUri.resolve(apiUrl), consumedMessage); LOGGER.info("HTTP status code: " + client.statusCode()); response = client.postRestService(jocUri.resolve(API_LOGOUT), null); LOGGER.info("HTTP status code: " + client.statusCode()); } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } finally { client.closeHttpClient(); } } } private static SOSRestApiClient setupHttpClient() { SOSRestApiClient client = new SOSRestApiClient(); String basicAuth = Base64.getMimeEncoder().encodeToString(("[USERNAME]:[PASSWORD]").getBytes()); client.setBasicAuthorization(basicAuth); return client; } }
Maven Configuration Example
This example shows the dependency needed to build the example above as a Maven project.