Introduction
This document article describes the implementation required for sending messages to a Java Message Queue Service queuewhich:
- to send sends and receive receives messages,
- to add adds an order to a JS7 workflow as an JS7 JOC by a JS7 - REST Web Service API call with a JSON body extracted from a message.
...
The Apache Active MQ server following Message Queue Service is used in this example: https://activemq.apache.org/.
Mode of Operation
The implementation enables includes the following steps:
- Creation creation of a message producer to send messages to a Message Queue server (MQ server).Service,
- creation Creation of a message consumer to receive messages from an MQ server.a Message Queue Service,
- using the message as part of a JS7 REST Web Service API callSending the message to a JS7 JOC API.
The message has is assumed to be a JSON snippet to be which is processed by the desired JS7 JOC REST Web Service API. This snippet must be valid JSON and compliant with the JOC APIrequests explained in the Technical Documentation of the REST Web Service API article.
Most of the implementation work is done with the standard JMS implementation of Java. The only class from the Active MQ implementation is the ActiveMQConnectionFactory
class. It shouldn´t be too complex to change the implementation to the ConnectionFactory
of your desired message queue servicepreferred Message Queue Service.
Download
A zip file of this example as a complete maven project implementation is available for download: js7-jms-example-js7-project.zip.
Prerequisites
- A running Message Queue server Service (The example uses Apache the example makes use of Active MQ)
- A running JS7 Controller, Agent and JOC Cockpit.
- Maven (required only needed if you users want to build the example as a maven Maven project)
The MessageProducer
This example describes how to build establish a connection to an MQ server a Message Queue Service and send a message to a queue on this server.
- Create create a connection.,Create
- create a session.,
- Create create a destination.,Create
- create a producer.,
- Send send a message with the producer.,
- Close close the connection.
...
Methods
The steps are separated into different methods to make the implementation more readable as well as reusable.
createConnection(String url)
This method instantiates a ConnectionFactory
object with an ActiveMQConnectionFactory
object and creates a Connection
object through the factory method createConnection()
.
write(String text)
The method is called with the message to be sent to the server. The message is a String
object. The method instantiates a Message
object with the text to be sentThe ActiveMQConnectionFactory
object has to be instantiated with the URL of the MQ server.
Code Block |
---|
language | java |
---|
title | createConnectionwrite(String urltext) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public Connectionvoid createConnectionwrite(String uri){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
text, String queueName, long ttl) throws Exception {
Connection connection = null;
try {
Session connectionsession = factory.createConnection()null;
} catch (JMSException e)try {
LOGGER.error("JMSException occurred while trying toConnectionFactory connect:factory "= ,new eActiveMQConnectionFactory(uri);
}
return connection;
} |
createSession(Connection connection)
The method is called with an already instantiated Connection
object and initiates a Session
object through the Connection
objects createSession(boolean transacted, int acknowledgeMode
)
method.
Code Block |
---|
language | java |
---|
title | createSession(Connection connection) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private Session createSession(Connection connection){
Session session = null;
try {
= factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
Destination destination = LOGGERsession.error("JMSException occurred while trying to create Session: " , e);
}
return session;
} |
createDestination(Session session, String queueName
)
This method creates a Destination
object. It is called with an active Session
object and the name of the queue to write to. The Destination
object is initiated through the createQueue(String name)
method of the Session
object.
Code Block |
---|
language | java |
---|
title | createDestination(Session session) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public Destination createDestination(Session session, String queueName){
Destination destinationcreateQueue(queueName);
MessageProducer producer = session.createProducer(destination);
// 5 sec time to live for the producer for this showcase
producer.setTimeToLive(ttl);
Message message = null;
try {
if(text destination != null){
message = session.createQueuecreateTextMessage(queueNametext);
} catch (JMSException e)} else{
LOGGER.error("JMSException occurred while trying to create Destination: "message , e= session.createTextMessage(TEXT);
}
return destination;
} |
createMessageProducer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageProducer
object with the given session and destination.
Code Block |
---|
language | java |
---|
title | createProducer(Session session, Destination destination) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private MessageProducer createMessageProducer(Session session, Destination destination) producer.send(message);
} catch (Throwable e) {
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
LOGGER.error("JMSException occurred while trying to write Message to Destination: ");
throw e;
} catch (JMSException e finally {
if(session != null) {
LOGGER.error("JMSException occurred while trying to create MessageProducer: " ,try e);{
}
return producer;
} |
write(String text, MessageProducer producer)
The method is called with the message to send to the server and the MessageProducer
object to use for publishing. The message is a String
object. The method instantiates a Message
object with the text to send.
Code Block |
---|
language | java |
---|
title | write(String text) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void write(String text, MessageProducer producer){
session.close();
Message message = null;
} catch try {(JMSException e) {
if(text != null){
LOGGER.warn("JMSException occurred while trying to messageclose =the session.createTextMessage(text: ", e);
} else{
}
message = session.createTextMessage(DEFAULT_TEXT);
}
producer.send(message);
}if catch (JMSException e(connection != null) {
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);try {
}
} |
close()
This method makes sure that the connection used will be closed after a message has been sent.
Code Block |
---|
language | java |
---|
title | close() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private void close(Connection connection){
if (connection != null) {
try {
connection.close();
connection.close();
} catch (JMSException e) {
LOGGER.error LOGGER.warn("JMSException occurred while trying to close the connection: " , e);
}
}
}
} |
The SOSProducer
class
The code example below slightly differs from the examples above. In the class below the write(String text)
method already uses the other methods for instantiation, therefore it needs less parameters and it closes the connection itself.
The text in the example below consists of a JobScheduler add_order
XML, which can later be used to send to a JobScheduler.
shows the complete class. The code creates a TEXT message consisting of the body for a JS7 /orders/add
API request.
Code Block |
---|
language | java |
---|
title | SOSProducer.java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSProducer {
private static final String DEFAULT_CONNECTION_URI = "tcp://[HOSTNAME]:61616";
private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
private static final String QUEUE_NAME = "test_queue";
private static final String DEFAULT_TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
private String uri;
private Connection createConnection(){
public SOSProducer(String uri) return{
this.createConnection(DEFAULT_CONNECTION_URI)uri = uri;
}
public Connectionvoid createConnectionwrite(String text, String uriqueueName) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(uriwrite(text, queueName, 5000L);
}
Connection connection =public null;
tryvoid write(String text, String queueName, long ttl) throws Exception {
Connection connection = factory.createConnection()null;
}Session catchsession (JMSException e) {= null;
try {
LOGGER.error("JMSException occurred while trying to connect: " , e);
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
}
return connection = factory.createConnection();
}
privatesession Session= connection.createSession(Connection connection){false, Session.AUTO_ACKNOWLEDGE);
Session session = null;
Destination destination = session.createQueue(queueName);
try {
MessageProducer producer = session = connection.createSession(false, Session.AUTO_ACKNOWLEDGEcreateProducer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: " , e// 5 sec time to live for the producer for this showcase
producer.setTimeToLive(ttl);
}
Message message = return sessionnull;
}
private Destination createDestinationif(Sessiontext session, String queueName!= null){
Destination destination = null;
message = try session.createTextMessage(text);
} else{
destination message = session.createQueue(queueNamecreateTextMessage(TEXT);
}
producer.send(message);
} catch (JMSExceptionThrowable e) {
LOGGER.error("JMSException occurred while trying to write Message createto Destination: " , e);
}
throw e;
return destination;
} finally {
private MessageProducer createMessageProducer if(Session session, Destination!= destinationnull) {
MessageProducer producer = null;
try {
try {
producer = session.createProducerclose(destination);
} catch (JMSException e) {
LOGGER.errorwarn("JMSException occurred while trying to close createthe MessageProducersession: " , e);
} }
return producer;
}
public voidif write(String text)connection != null) {
Connection connection = createConnection();
try {
Session session = createSession(connection);
Destination destination = createDestinationconnection.close(session);
MessageProducer producer = createMessageProducer(session, destination);
} catch (JMSException e) Message{
message = null;
try {
LOGGER.warn("JMSException occurred while trying to close the if(text != null){connection: ", e);
message = session.createTextMessage(text);}
} else{
}
message = session.createTextMessage(DEFAULT_TEXT)}
} |
The MessageConsumer
This section describes how to establish a connection to a Message Queue Service and receive a message from a queue. Furthermore it shows how to connect to a JOC Cockpit instance via HTTP to send an API request:
- create a connection,
- create a session,
- create a destination.
- create a consumer,
- receive a message with the consumer,
- close the (MQ) connection,
- login to a JOC Cockpit instance via a HTTP REST API call,
- send a ./orders/add API request to a JOC Cockpit instance,
- close the connection.
Methods
read(
)
The method instantiates a MessageConsumer
object to receive a message from the Message Queue Service. It extracts the value from the Message
object as a string representation via the Message
objects getText()
method.
Code Block |
---|
language | java |
---|
title | read() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public String read(String queueName) throws Exception {
TextMessage message = null;
String textMessage = null;
Connection connection = null;
Session session = null;
try {
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
connection.start();
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
message = (TextMessage) receivedMessage;
}
textMessage = producermessage.sendgetText(message);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
info("Reading message: " + textMessage);
break;
} finallyelse {
if (connection != null) {
break;
try {}
}
connection.close();
}
} catch (JMSExceptionThrowable e) {
LOGGER.error("JMSException occurred while trying to closeread thefrom connectionDestination: " , e);
);
throw e;
} finally {
if(session != null) {
}
try {
}
}
}
} |
The MessageConsumer
This section describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.
- Create a connection.
- Create a session.
- Create a destination.
- Create a consumer.
- Receive a message with the consumer.
- Close the (MQ) connection.
- Open a TCP connection to a JobScheduler instance.
- Send the Message to the JobScheduler
- Close the (TCP) connection.
This section shows examples for the last 5 steps as the first four are similar to the examples already shown above for the instantiation of the MessageProducer
.
The Methods
createMessageConsumer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageConsumer
object with the given session and destination.
Code Block |
---|
language | java |
---|
title | createMessageConsumer(Session session, Destination destination) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
MessageConsumer consumer = null;
session.close();
} catch (JMSException e) {
LOGGER.warn("JMSException occurred while trying to close the session: ", e);
}
}
if (connection != null) {
try {
consumer = sessionconnection.createConsumerclose(destination);
} catch (JMSException e) {
LOGGER.errorwarn("JMSException occurred while trying to close createthe MessageConsumerconnection: ", e);
}
}
}
return consumer;
} |
read(MessageConsumer consumer
)
The SOSConsumer
class
The code example below shows the complete classThe method is called with an already instantiated MessageConsumer
object to receive a message from the MQ server. It extracts the value from the Message
object as a string representation via the Message
objects getText()
method.
Code Block |
---|
language | java |
---|
title | read()SOSConsumer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private String read(MessageConsumer consumer) {
TextMessage message = null;
String textMessage = null;
try {
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessagepackage com.sos.jms.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SOSConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
private String uri;
public SOSConsumer (String uri) {
message = (TextMessage) receivedMessagethis.uri = uri;
}
public String textMessage = message.getText();
read(String queueName) throws Exception {
TextMessage LOGGER.info("Reading message: " + textMessage)= null;
String textMessage = null;
Connection connection = breaknull;
Session session = null;
} elsetry {
ConnectionFactory factory = breaknew ActiveMQConnectionFactory(uri);
connection = factory.createConnection();
}
session = }
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
Destination }destination catch= (JMSException e) {session.createQueue(queueName);
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
connection.start();
}
MessageConsumer return textMessage;
} |
Note |
---|
Don´t forget to clean up (call close() ) after the message has been received. |
connect(String host, int port)
This method instantiates a TCP socket connection to a JobScheduler instance with the given host:port
.
Code Block |
---|
language | java |
---|
title | connect() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void connect(String host, int port) throws Exception {
if (host == null || host.length() == 0) {
consumer = session.createConsumer(destination);
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
throw (new Exception("hostname missing."));
}
if (portreceivedMessage ==instanceof 0TextMessage) {
throw (new Exception("port missing."));
}
message if= ("udp".equalsIgnoreCase(PROTOCOL)TextMessage) {
receivedMessage;
udpSocket = new DatagramSocket();
textMessage = udpSocket.connect(InetAddress.getByName(HOST), PORT);
message.getText();
} else {
socket = new Socket(host, portLOGGER.info("Reading message: " + textMessage);
in = new DataInputStream(socket.getInputStream());
out = new PrintWriter(socket.getOutputStream(), true) break;
}
} |
disconnect()
This method is used to close the socket opened with the above connect()
method.
Code Block |
---|
language | java |
---|
title | disconnect() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void disconnect() throws Exception {
} else {
if (socket != null) {
socket.close()break;
}
if (in != null) {}
in.close();
}
if (out != null) {
}
out.close();
}
} |
sendRequest(String command)
This method sends a request with the given command
to the connected JobScheduler instance. The command given in the example is the add_order
XML snippet shown in the SOSProducer
example.
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void sendRequest(String command) throws Exception {
catch (Throwable e) {
if LOGGER.error("udp".equalsIgnoreCase(PROTOCOL)) {
if (command.indexOf("<?xml") == -1) {
JMSException occurred while trying to read from Destination: ");
throw e;
command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command} + "\r\n";
finally {
}
if(session != null) {
byte[] commandBytes = command.getBytes();
udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
try {
} else {
if (commandsession.indexOfclose("<?xml");
== 0) {
out.print(command + "\r\n");
} catch (JMSException e) {
} else {
out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n" LOGGER.warn("JMSException occurred while trying to close the session: ", e);
}
out.flush();}
}
} |
getResponse()
This message receives the response of the TCP connection described above.
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public String getResponse() throws IOException, RuntimeException {
}
int sec = 0;
byte[] buffer = {};if (connection != null) {
while (in.available() == 0) {
try {
Threadconnection.sleepclose(1000);
} catch (InterruptedExceptionJMSException e) {
LOGGER.error("", e);
}
if (sec++ == TIMEOUT) {
LOGGER.warn("JMSException occurred while trying to close the connection: ", e);
throw new RuntimeException("timeout reached"); }
}
}
buffer = new byte[in.available()];
}
int bytesRead = in.read(buffer)return textMessage;
return new String(buffer, "ISO-8859-1");
} |
receiveFromQueue()
Java Class with a main(String[] args)
method (example JmsExecute.java
)
The Java class makes use of the SOSProducer to create a message and send it to the message queue. It then uses the SOSConsumer class to read the message from the queue. In addition, it creates an HTTP connection to a JS7 JOC Cockpit instance to call the /orders/add
API with the JSON body from the message received.
The example class uses the SOSRestApiClient to create the HTTP connection. The SOSRestApiClient is based on the org.apache.httpcomponents::httpclient. Users can use their own HTTP client implementation. The receiveFromQueue()
method uses the methods described above to connect to a host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.
Code Block |
---|
language | java |
---|
title | receiveFromQueue() | linenumbers | trueJmsExecute.java |
---|
collapse | true |
---|
|
public String receiveFromQueue() {
String message = null;
try {
connect();
message = read();
sendRequest(message);
disconnect();
} catch (Exception e) {
LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
}
return message;
} |
The SOSConsumer
class
The code example below slightly differs from the examples above. In the class below the read()
method already uses the other method for instantiation, therefore it needs less parameters and it closes the connection itself.
Code Block |
---|
language | java |
---|
title | SOSConsumer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.consumer;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSConsumer {package com.sos.jms;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Properties;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sos.commons.httpclient.SOSRestApiClient;
import com.sos.jms.consumer.SOSConsumer;
import com.sos.jms.producer.SOSProducer;
public class JmsExecute {
private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616";
private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/";
private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\""
+ ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
private static final String DEFAULT_CONNECTION_URIUSERNAME = "tcp://[HOST]:61616root";
private static final LoggerString LOGGERDEFAULT_PWD = Logger.getLogger(SOSConsumer.class)"root";
private static final String DEFAULT_QUEUE_NAME = "test_queue";
private static final String HOSTAPI_ADD_ORDER = "[JobScheduler_HOSTNAME]orders/add";
private static final int PORT = [JobScheduler_PORT];
private static final String PROTOCOLString API_LOGIN = "tcpauthentication/login";
private static final int TIMEOUT = 5;
private Socket socketString API_LOGOUT = null"authentication/logout";
private static DatagramSocket udpSocketfinal String ACCESS_TOKEN_HEADER = null"X-Access-Token";
private static final DataInputStreamString inAPPLICATION_JSON = null"application/json";
private static PrintWriter outfinal String CONTENT_TYPE = null"Content-Type";
private Connectionstatic createConnection() {
final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class);
return this.createConnection(DEFAULT_CONNECTION_URI);
}
private static String jmsServerUri = null;
private Connectionstatic createConnection(String jocApiUri uri)= {null;
private static String ConnectionFactory factorycontrollerId = new ActiveMQConnectionFactory(uri)null;
private static String Connection connectionworkflowPath = null;
private static String requestBody try= {
null;
private static String username = null;
private static String connectionpwd = factory.createConnection()null;
private static String queueName = }null;
catch (JMSException e) {
private static Long queueTtl = null;
public static void LOGGER.error("JMSException occurred while trying to connect: ", e);
main(String[] args) throws URISyntaxException {
SOSRestApiClient client = }null;
returntry connection;{
}
private Session createSession(Connection connection)URL {
classUrl = JmsExecute.class.getProtectionDomain().getCodeSource().getLocation();
Session session = null;
Path classPath = Paths.get(classUrl.toURI());
try {
String filename session = connection.createSession(false, Session.AUTO_ACKNOWLEDGEclassPath.getFileName().toString().replace(".jar", ".config");
} catch (JMSException e) {LOGGER.info(classPath.getParent().resolve(filename).toString());
LOGGER.error("JMSException occurred while trying to create Session: ", ereadPropertiesFile(classPath.getParent().resolve(filename));
}
if return session;("produce".equals(args[0])) {
}
private Destination createDestination(Session session) {
SOSProducer producer = new return this.createDestination(session, DEFAULT_QUEUE_NAMESOSProducer(jmsServerUri);
}
private Destination createDestination(Session session, String queueName) {
LOGGER.info("message send to queue:");
Destination destination = null;
LOGGER.info(requestBody);
try {
destination = sessionproducer.createQueuewrite(requestBody, queueName, queueTtl);
} else catchif (JMSException e"consume".equals(args[0])) {
LOGGER.error("JMSException occurred while trying toSOSConsumer createconsumer Destination:= ", enew SOSConsumer(jmsServerUri);
}
String consumedMessage return= destinationnull;
}
private MessageConsumer createMessageConsumer(Session session,consumedMessage Destination destination) {= consumer.read(queueName);
MessageConsumer consumer = null;
LOGGER.info("message received from queue:");
try {
consumer = sessionLOGGER.createConsumerinfo(destinationconsumedMessage);
} catchif (JMSException econsumedMessage != null) {
LOGGER.error("JMSException occurred while trying to createclient MessageConsumer: "= setupHttpClient(username, epwd);
}
URI jocUri return consumer;
= URI.create(jocApiUri);
}
private String read() {
LOGGER.info("send login TextMessageto: message" = null+ jocUri.resolve(API_LOGIN).toString());
String textMessage = null;
ConnectionString connectionresponse = createConnection(client.postRestService(jocUri.resolve(API_LOGIN), null);
try {
LOGGER.info("HTTP status Sessioncode: session" =+ createSessionclient.statusCode(connection));
Destination destinationif (client.statusCode() == createDestination(session);200) {
connection.start();
MessageConsumerJsonReader consumerjsonReader = createMessageConsumer(session, destination)null;
while (true) {
String accessToken = null;
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null)try {
jsonReader = Json.createReader(new StringReader(response));
if (receivedMessage instanceof TextMessage) {
JsonObject messagejson = jsonReader.readObject(TextMessage) receivedMessage;
textMessageaccessToken = messagejson.getText(getString("accessToken", "");
LOGGER.info("Reading message: " + textMessage);} catch (Exception e) {
break;
throw new Exception("Could not determine accessToken.", e);
} else {
} break;
finally {
}
jsonReader.close();
}
}
}
catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: " client.addHeader(ACCESS_TOKEN_HEADER, eaccessToken);
} finally {
if (connection != null) {client.addHeader(CONTENT_TYPE, APPLICATION_JSON);
try {
LOGGER.info("REQUEST: " connection.close(+ API_ADD_ORDER);
} catch (JMSException e) {
LOGGER.info("PARAMS: " + consumedMessage);
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
String apiUrl = null;
}
}
}
if (!API_ADD_ORDER.toLowerCase().startsWith(jocApiUri)) {
return textMessage;
}
public String receiveFromQueue() {
apiUrl String= messagejocApiUri = null+ API_ADD_ORDER;
try {
connect();}
message = read();
LOGGER.info("resolvedUri: " sendRequest(message+ jocUri.resolve(apiUrl).toString());
disconnect();
}response catch (Exception e) {= client.postRestService(jocUri.resolve(apiUrl), consumedMessage);
LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:LOGGER.info("HTTP status code: " + PORT, eclient.statusCode());
}
return message;
}
response public void connect(String host, int port) throws Exception {
= client.postRestService(jocUri.resolve(API_LOGOUT), null);
if (host == null || host.length() == 0) {
throw (new Exception("hostname missing."LOGGER.info("HTTP status code: " + client.statusCode());
}
if (port == 0){}
throw (new Exception("port missing."));
}
if ("udp".equalsIgnoreCase(PROTOCOL)) {
}
} catch udpSocket(Throwable = new DatagramSocket();e) {
udpSocket.connect(InetAddress.getByName(HOST), PORTe.printStackTrace();
} else {
System.exit(1);
socket} =finally new Socket(host, port);
{
if (client in != new DataInputStream(socket.getInputStream());
null) {
out = new PrintWriter(socketclient.getOutputStreamcloseHttpClient(), true);
}
}
public void connect() throws Exception {}
}
this.connect(HOST, PORT);
}
/** sends a request to a JobScheduler
*private static SOSRestApiClient setupHttpClient(String username, String password) {
SOSRestApiClient client = new SOSRestApiClient();
* @param command
String basicAuth = * @throws java.lang.Exception
Base64.getMimeEncoder().encodeToString((username + ":" + password).getBytes());
**/client.setBasicAuthorization(basicAuth);
public void sendRequest(String command) throws Exception {
return client;
}
private static ifString ("udp".equalsIgnoreCase(PROTOCOL)cleanupValue(String value) {
value = value.trim();
if (commandvalue.indexOfstartsWith("<?xml\"") == -1) {
commandvalue = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
value.substring(1);
}
byte[] commandBytes =if command(value.getBytesendsWith();
"\"")) {
value = udpSocketvalue.send(new DatagramPacket(commandBytessubstring(0, commandBytesvalue.length, InetAddress.getByName(HOST), - PORT)1);
} else {
return value;
if (command.indexOf("<?xml") == 0) {
}
private static void readPropertiesFile(Path path) {
Properties props out.print(command + "\r\n"= new Properties();
try {
} else {
props.load(Files.newInputStream(path));
out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n") jmsServerUri = cleanupValue(props.getProperty("jms_url"));
}
out.flush(LOGGER.info("cfg jms_url: " + jmsServerUri);
}
}
queueName = cleanupValue(props.getProperty("jms_queue_name"));
/** receives the response of the JobScheduler for the last sent request
* LOGGER.info("cfg jms_queue_name: " + queueName);
* @return String response from JobScheduler
queueTtl * @throws IOException */= Long.parseLong(cleanupValue(props.getProperty("jms_queue_name")));
public String getResponse() throws IOException, RuntimeException {
LOGGER.info("cfg jms_queue_ttl: " + int sec = 0;
queueTtl.toString());
byte[]jocApiUri buffer = {};
cleanupValue(props.getProperty("joc_api_url"));
while (inLOGGER.available() == 0) {
info("cfg joc_api_url: " + jocApiUri);
controllerId try {= cleanupValue(props.getProperty("controller_id"));
LOGGER.info("cfg controller_id: " + Thread.sleep(1000controllerId);
}workflowPath catch= (InterruptedException e) {
cleanupValue(props.getProperty("workflow_path"));
LOGGER.errorinfo("", ecfg workflow_path: " + workflowPath);
username }
= cleanupValue(props.getProperty("username"));
if (sec++pwd == TIMEOUT) { cleanupValue(props.getProperty("password"));
requestBody = "{\"controllerId\":\"" + throwcontrollerId new RuntimeException("timeout reached");
+ "\",\"orders\":[{\"workflowPath\":\"" + workflowPath
}
}+ "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
buffer} =catch new byte[in.available()];
(IOException e) {
int bytesRead = inLOGGER.read(buffer);
return new String(buffer, "ISO-8859-1warn("could not read properties file, use defaults instead.");
}
public void disconnect() throwsjmsServerUri Exception {
= DEFAULT_JMS_URI;
if queueName (socket != null) {
= DEFAULT_QUEUE_NAME;
jocApiUri socket.close()= DEFAULT_JOC_API_URL;
}
requestBody if (in != null) {
= DEFAULT_JOC_API_REQUEST_BODY;
username in.close()= DEFAULT_USERNAME;
}
pwd if (out != null) {
= DEFAULT_PWD;
queueTtl = out.close()5000L;
}
}
} |
Maven Configuration Example
This example shows the dependency needed dependencies required to build the example above as a Maven project.
Code Block |
---|
language | xml |
---|
title | Maven Configuration |
---|
collapse | true |
---|
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sos-berlin</groupId>
<artifactId>activeMQ-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.1315.0</version>
</dependency>
</dependencies>
</project> |
...