Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

This document article describes the implementation required for sending messages to a Java Message Queue Service queuewhich:

  • to send sends and receive receives messages,
  • to add adds an order to a JobScheduler job chain as an example XML command a JS7 workflow by a JS7 - REST Web Service API call with a JSON body extracted from a message.

...

  • .

The Apache Active MQ server following Message Queue Service is used in this example: https://activemq.apache.org/.

Mode of Operation

The implementation enables includes the following steps:

  • Creation creation of a message producer to send messages to a Message Queue server (MQ server).Service,
  • creation Creation of a message consumer to receive messages from an MQ server.a Message Queue Service,
  • using the message as part of a JS7 REST Web Service API callSending the message to a JobScheduler.

The message has is assumed to be an XML snippet to be a JSON snippet which is processed by the desired JobSchedulerJS7 REST Web Service API. This snippet must be valid XML JSON and compliant with the JobScheduler XML Schemarequests explained in the Technical Documentation of the REST Web Service API article.

Most of the implementation work is done with the standard JMS implementation of Java. The only class from the Active MQ implementation is the ActiveMQConnectionFactory class. It shouldn´t be too complex to change the implementation to the ConnectionFactory of your desired message queue servicepreferred Message Queue Service.

Download

A zip file of this example as a complete maven project implementation is available for download:   activejs7-mqjms-example-project.zip.

Prerequisites

  • A running Message Queue server Service (The example uses Apache the example makes use of Active MQ)
  • A running JobSchedulerJS7 Controller, Agent and JOC Cockpit.
  • Maven (required only needed if you users want to build the example as a maven Maven project)

The MessageProducer

This example describes how to build establish a connection to an MQ server a Message Queue Service and send a message to a queue on this server.

  1. Create create a connection.,Create
  2. create a session.,
  3. Create create a destination.,Create
  4. create a producer.,
  5. Send send a message with the producer.,
  6. Close close the connection.

...

Methods

The steps are separated into different methods to make the implementation more readable as well as reusable. 

...

write(String

...

text)

This method instantiates a ConnectionFactory object with an ActiveMQConnectionFactory object and creates a Connection object through the factory method createConnection().

The ActiveMQConnectionFactory object has to be instantiated with the URL of the MQ server.

The method is called with the message to be sent to the server. The message is a String object. The method instantiates a Message object with the text to be sent.

Code Block
languagejava
titlecreateConnectionwrite(String urltext)
linenumberstrue
collapsetrue
    public Connectionvoid createConnectionwrite(String text, String queueName, long urittl){
 throws Exception {
 ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
    Connection connection = null;
    try {
   Session     connectionsession = factory.createConnection()null;
     } catch (JMSException e)try {
        LOGGER.error("JMSException occurred while trying toConnectionFactory connect:factory "= ,new eActiveMQConnectionFactory(uri);
      }
     return connection;
}

createSession(Connection connection)

The method is called with an already instantiated Connection object and initiates a Session object through the Connection objects createSession(boolean transacted,  int acknowledgeMode) method.

createSession(Connection )
Code Block
languagejava
title
connection
linenumberstrue
collapsetrue
private Session createSession(Connection connection){ = factory.createConnection();
    Session session = null;
    try {
        session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    } catch (JMSException e) {
     Destination destination = LOGGERsession.error("JMSException occurred while trying to create Session: " , e);
    }createQueue(queueName);
            MessageProducer producer = session.createProducer(destination);
            // 5 sec time to live for the producer for this showcase
    return session;
}

createDestination(Session session, String queueName)

This method creates a Destination object. It is called with an active Session object and the name of the queue to write to. The Destination object is initiated through the createQueue(String name) method of the Session object.

Code Block
languagejava
titlecreateDestination(Session session)
linenumberstrue
collapsetrue
public Destination createDestination(Session session, String queueName){        producer.setTimeToLive(ttl);
    Destination destination = null;
    try {
 Message message = null;
            if(text != null){
                destinationmessage = session.createQueuecreateTextMessage(queueNametext);
         } catch (JMSException e)} else{
        LOGGER.error("JMSException occurred while trying to create Destination: " , e);
        message = session.createTextMessage(TEXT);
            }
        return destination;
}

createMessageProducer(Session session, Destination destination)

This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageProducer object with the given session and destination.

Code Block
languagejava
titlecreateProducer(Session session, Destination destination)
linenumberstrue
collapsetrue
private MessageProducer createMessageProducer(Session session, Destination destination){
    MessageProducer producer = null;    producer.send(message);
        } catch (Throwable e) {
    try {
        producer = session.createProducer(destination);
LOGGER.error("JMSException occurred while trying to write Message to Destination: ");
            throw e;
        } catch (JMSException e finally {
            if(session != null) {
        LOGGER.error("JMSException occurred while trying to create MessageProducer: " ,try e);{
     }        
    return producer;
}

write(String text, MessageProducer producer)

The method is called with the message to send to the server and the MessageProducer object to use for publishing. The message is a String object. The method instantiates a Message object with the text to send.

Code Block
languagejava
titlewrite(String text)
linenumberstrue
collapsetrue
public void write(String text, MessageProducer producer){
   session.close();
         Message message = null;
    try} catch (JMSException e) {
        if(text != null){
          LOGGER.warn("JMSException  message = session.createTextMessage(textoccurred while trying to close the session: ", e);
        } else{
          }
  message = session.createTextMessage(DEFAULT_TEXT);
        }
        producer.send(message);
    }if catch (JMSException e(connection != null) {
        LOGGER.error("JMSException occurred while trying to write Message to Destination:try "{
 , e);
    }
}

close()

This method makes sure that the connection used will be closed after a message has been sent.

Code Block
languagejava
titleclose()
linenumberstrue
collapsetrue
private void close(Connection connection){
    if (connection != null) {
   connection.close();
     try {
          } catch connection.close(JMSException e); {
        } catch (JMSException e) {
            LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e);
                }
            }
        }
    }

The SOSProducer class

The code example below slightly differs from the examples above. In the class below the write(String text) method already uses the other methods for instantiation, therefore it needs less parameters and it closes the connection itself.

The text in the example below consists of a JobScheduler add_order XML, which can later be used to send to a JobScheduler.

shows the complete class. The code creates a TEXT message consisting of the body for a JS7 /orders/add API request.

Code Block
languagejava
titleSOSProducer.java
linenumberstrue
collapsetrue
package com.sos.jms.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;


public class SOSProducer {
    private static final String DEFAULT_CONNECTION_URI = "tcp://[HOSTNAME]:61616";
    private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
    private static final String QUEUE_NAMETEXT = "test_queue";
    private static final String DEFAULT_TEXT = "<add_order job_chain='[FOLDERNAME]/[JOBCHAINNAME]' at='now'>""{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
    private String uri;
    
  + "<params>"
 public SOSProducer(String uri) {
        + "<param name='host' value='[HOST]'/>"this.uri = uri;
    }
    
    public void write(String text, String +queueName) "<param name='port' value='22'/>"throws Exception {
        write(text, queueName, 5000L);
  + "<param name='user' value='[USERNAME]'/>" }
    
    public void write(String text, +String "<param name='auth_method' value='password'/>"
   queueName, long ttl) throws Exception {
        Connection +connection "<param name='password' value='[PASSWORD]'/>"
 null;
        Session session = null;
  + "<param name='command' value='echo command send over MessageQueue!'/>"try {
            + "</params>"ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection + "</add_order>"= factory.createConnection();
    
    private Connection createConnection(){
  session =     return this.createConnection(DEFAULT_CONNECTION_URIconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
    
   Destination publicdestination Connection= createConnection(String uri){
session.createQueue(queueName);
          ConnectionFactory  MessageProducer factoryproducer = new ActiveMQConnectionFactory(urisession.createProducer(destination);
        Connection connection = null;
        try {    // 5 sec time to live for the producer for this showcase
            connection = factory.createConnection(producer.setTimeToLive(ttl);
          }  catchMessage (JMSExceptionmessage e)= {null;
            LOGGER.error("JMSException occurred while trying to connect: " , e);
if(text != null){
              }
  message = session.createTextMessage(text);
    return connection;
    }
   } else{
    private Session createSession(Connection connection){
        Session sessionmessage = nullsession.createTextMessage(TEXT);
         try {
  }
          session = connectionproducer.createSession(false, Session.AUTO_ACKNOWLEDGEsend(message);
        } catch (JMSExceptionThrowable e) {
            LOGGER.error("JMSException occurred while trying to write Message createto SessionDestination: " , e);
        }
    throw e;
   return session;
    } finally {
     
     private Destination createDestinationif(Session session, String!= queueNamenull) {
        Destination    destination = null;
  try {
     try {
            destination = session.createQueueclose(queueName);
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to createclose Destinationthe session: " , e);
                }
        return destination;
    }
       
    private MessageProducerif createMessageProducer(Sessionconnection session, Destination destination){
!= null) {
           MessageProducer producer = null;
  try {
     try {
            producer = sessionconnection.createProducerclose(destination);
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close createthe MessageProducerconnection: " , e);
        }        }
           return producer;
}
        }
    }
    public void write(String text){
        Connection connection = createConnection();
    
}

The MessageConsumer

This section describes how to establish a connection to a Message Queue Service and receive a message from a queue. Furthermore it shows how to connect to a JOC Cockpit instance via HTTP to send an API request:

  1. create a connection,
  2. create a session,
  3. create a destination.
  4. create a consumer,
  5. receive a message with the consumer,
  6. close the (MQ) connection,
  7. login to a JOC Cockpit instance via a HTTP REST API call,
  8. send a ./orders/add API request to a JOC Cockpit instance,
  9. close the connection.

Methods

read()

The method instantiates a MessageConsumer object to receive a message from the Message Queue Service. It extracts the value from the Message object as a string representation via the Message objects getText() method. 

Code Block
languagejava
titleread()
linenumberstrue
collapsetrue
    public String read(String queueName) throws Exception    Session session = createSession(connection);
        Destination destination = createDestination(session);
        MessageProducer producer = createMessageProducer(session, destination);
        Message message = null;
        try {
        TextMessage message   if(text != null){
= null;
        String textMessage = null;
        Connection messageconnection = session.createTextMessage(text)null;
        Session session = null;
     } else{
  try {
            ConnectionFactory messagefactory = new session.createTextMessage(DEFAULT_TEXTActiveMQConnectionFactory(uri);
             }
connection = factory.createConnection();
            session producer= connection.send(messagecreateSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
Destination destination           LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
 = session.createQueue(queueName);
            connection.start();
       } finally {
   MessageConsumer consumer        if (connection != null) {= session.createConsumer(destination);
                trywhile (true) {
                Message receivedMessage =  connectionconsumer.closereceive(1);
                }if catch (JMSException e(receivedMessage != null) {
                    if LOGGER.error("JMSException occurred while trying to close the connection: " , e);
(receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        }
textMessage = message.getText();
               }
         }
    }
}

The MessageConsumer

This section describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.

  1. Create a connection.
  2. Create a session.
  3. Create a destination.
  4. Create a consumer.
  5. Receive a message with the consumer.
  6. Close the (MQ) connection.
  7. Open a TCP connection to a JobScheduler instance.
  8. Send the Message to the JobScheduler
  9. Close the (TCP) connection.

This section shows examples for the last 5 steps as the first four are similar to the examples already shown above for the instantiation of the MessageProducer.

The Methods

createMessageConsumer(Session session, Destination destination)

This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageConsumer object with the given session and destination.

LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (Throwable e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ");
            throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the session: ", e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.warn("JMSException occurred while trying to close the connection: ", e);
                }
            }
         }
        return textMessage;
    }

The SOSConsumer class

The code example below shows the complete class. 

Code Block
languagejava
titleSOSConsumer
linenumberstrue
collapsetrue
package com.sos.jms.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SOSConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
    private String uri;
    
    public SOSConsumer (String uri)
Code Block
languagejava
titlecreateMessageConsumer(Session session, Destination destination)
linenumberstrue
collapsetrue
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
    MessageConsumer consumer = null;
    try {
        consumerthis.uri = session.createConsumer(destination)uri;
    }
 catch (JMSException e) {
    public String read(String  LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e)queueName) throws Exception {
        TextMessage message = null;
    }
    String textMessage return= consumer;
}

read(MessageConsumer consumer)

The method is called with an already instantiated MessageConsumer object to receive a message from the MQ server. It extracts the value from the Message object as a string representation via the Message objects getText() method. 

Code Block
languagejava
titleread()
linenumberstrue
collapsetrue
private String read(MessageConsumer consumer) {
null;
        TextMessageConnection messageconnection = null;
      String  Session textMessagesession = null;
        try {
        while (true) {    ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            Message receivedMessageconnection = consumerfactory.receivecreateConnection(1);
            session if= connection.createSession(receivedMessage != null) {false, Session.AUTO_ACKNOWLEDGE);
            Destination destination =  if (receivedMessage instanceof TextMessage) {
session.createQueue(queueName);
            connection.start();
            MessageConsumer messageconsumer = session.createConsumer(TextMessagedestination) receivedMessage;
                    textMessage = message.getText();while (true) {
                Message receivedMessage =  LOGGERconsumer.info("Reading message: " + textMessage);
receive(1);
                if (receivedMessage != null) {
      break;
              if (receivedMessage }instanceof elseTextMessage) {
                    break;
    message = (TextMessage) receivedMessage;
         }
            }
   textMessage = message.getText();
   }
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to read from Destination LOGGER.info("Reading message: ", + etextMessage);
    }
     return textMessage;
}
Note

 Don´t forget to clean up (call close()) after the message has been received.

connect(String host, int port)

This method instantiates a TCP socket connection to a JobScheduler instance with the given host:port.

Code Block
languagejava
titleconnect()
linenumberstrue
collapsetrue
public void connect(String host, int port) throws Exception {
    if (host == null || host.length() == 0)               break;
                    } else {
       throw (new Exception("hostname missing."));
                 break;
                    }
       if (port == 0){
         }
         throw (new Exception("port missing."));
 }
     }
   } ifcatch ("udp".equalsIgnoreCase(PROTOCOL)Throwable e) {
         udpSocket = new DatagramSocket();
        udpSocket.connect(InetAddress.getByName(HOST), PORT);
LOGGER.error("JMSException occurred while trying to read from Destination: ");
        }  else {
 throw e;
      socket = new Socket(host, port);} finally {
        in  = new DataInputStream(socket.getInputStream());
        out = new PrintWriter(socket.getOutputStream(), true);
    }
}

disconnect()

This method is used to close the socket opened with the above connect() method.

Code Block
languagejava
titledisconnect()
linenumberstrue
collapsetrue
public void disconnect() throws Exceptionif(session != null) {
                try {
    if (socket != null) {
                socketsession.close();
             }
   } ifcatch (in != nullJMSException e) {
             in.close();
    }
    if (out != null) {
       LOGGER.warn("JMSException occurred while trying to close the session: ", e);
           out.close();
     }
}

sendRequest(String command)

This method sends a request with the given command to the connected JobScheduler instance. The command given in the example is the add_order XML snippet shown in the SOSProducer example.

Code Block
languagejava
linenumberstrue
collapsetrue
public void sendRequest(String command) throws Exception {
    if ("udp".equalsIgnoreCase(PROTOCOL)) {
 }
            if (command.indexOf("<?xml") == -1connection != null) {
            command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
 try {
         }
        byte[] commandBytes = commandconnection.getBytesclose();
        udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
        } catch (JMSException e) {
         } else {
        if (commandLOGGER.indexOfwarn("<?xml") == 0) {
      JMSException occurred while trying to close the connection: ", e);
      out.print(command + "\r\n");
        }
 else {
          }
  out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
       }
        return }textMessage;
        out.flush();
    }
}

getResponse()

This message receives the response of the TCP connection described above.

Code Block
languagejava
linenumberstrue
collapsetrue
public String getResponse() throws IOException, RuntimeException {
    int sec = 0;
    byte[] buffer = {};
    while (in.available() == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            LOGGER.error("", e);
        }
        if (sec++ == TIMEOUT) {
            throw new RuntimeException("timeout reached");
        }
    }
    buffer = new byte[in.available()];
    int bytesRead = in.read(buffer);
    return new String(buffer, "ISO-8859-1");
}

receiveFromQueue()

The receiveFromQueue() method uses the methods described above to connect to a host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.

Code Block
languagejava
titlereceiveFromQueue()
linenumberstrue
collapsetrue
public String receiveFromQueue() {
    String message = null;
    try {
        connect();
        message = read();
        sendRequest(message);
        disconnect();
    } catch (Exception e) {
        LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
    }
    return message;
}

The SOSConsumer class

The code example below slightly differs from the examples above. In the class below the read() method already uses the other method for instantiation, therefore it needs less parameters and it closes the connection itself.

 }

}

Java Class with a main(String[] args) method (example JmsExecute.java)

The Java class makes use of the SOSProducer to create a message and send it to the message queue. It then uses the SOSConsumer class to read the message from the queue. In addition, it creates an HTTP connection to a JS7 JOC Cockpit instance to call the /orders/add API with the JSON body from the message received.

The example class uses the SOSRestApiClient to create the HTTP connection. The SOSRestApiClient is based on the org.apache.httpcomponents::httpclient. Users can use their own HTTP client implementation. 

Code Block
languagejava
titleJmsExecute.java
collapsetrue
package com.sos.jms;

import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Properties;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sos.commons.httpclient.SOSRestApiClient;
import com.sos.jms.consumer.SOSConsumer;
import com.sos.jms.producer.SOSProducer;

public class JmsExecute {

    private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616";
    private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/";
    private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\""
            + ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
    private static final String DEFAULT_USERNAME = "root";
    private static final String DEFAULT_PWD = "root";
    private static final String DEFAULT_QUEUE_NAME = "test_queue";
    private static final String API_ADD_ORDER = "orders/add";
    private static final String API_LOGIN = "authentication/login";
Code Block
languagejava
titleSOSConsumer
linenumberstrue
collapsetrue
package com.sos.jms.consumer;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SOSConsumer {
    private static final String DEFAULTAPI_CONNECTION_URILOGOUT = "tcp://[HOST]:61616authentication/logout";
    private static final Logger LOGGER = Logger.getLogger(SOSConsumer.class)String ACCESS_TOKEN_HEADER = "X-Access-Token";
    private static final String DEFAULTAPPLICATION_QUEUE_NAMEJSON = "test_queueapplication/json";
    private static final String HOSTCONTENT_TYPE = "[JobScheduler_HOSTNAME]Content-Type";
    private static final intLogger PORTLOGGER = [JobScheduler_PORT]LoggerFactory.getLogger(JmsExecute.class);
    private static final String PROTOCOLjmsServerUri = "tcp"null;
    private static finalString int TIMEOUTjocApiUri = 5null;
    private static SocketString socketcontrollerId = null;
    private static DatagramSocketString udpSocketworkflowPath = null;
    private static DataInputStreamString inrequestBody = null;
    private static PrintWriterString outusername = null;

    private Connection createConnection() {
        return this.createConnection(DEFAULT_CONNECTION_URI);
    }

 static String pwd = null;
    private Connectionstatic createConnection(String queueName uri)= {null;
    private static Long queueTtl = null;

   ConnectionFactory factorypublic =static newvoid ActiveMQConnectionFactory(uri);main(String[] args) throws URISyntaxException {
        ConnectionSOSRestApiClient connectionclient = null;
        try {
            URL connectionclassUrl = factory.createConnectionJmsExecute.class.getProtectionDomain().getCodeSource().getLocation();
            Path classPath  } catch (JMSException e) {
= Paths.get(classUrl.toURI());
            String filename   LOGGER.error("JMSException occurred while trying to connect: ", e= classPath.getFileName().toString().replace(".jar", ".config");
        }
    LOGGER.info(classPath.getParent().resolve(filename).toString());
    return connection;
    }

    private Session createSession(Connection connection) {readPropertiesFile(classPath.getParent().resolve(filename));
        Session session = null;
 if ("produce".equals(args[0])) {
     try {
          SOSProducer producer session= =new connection.createSession(false, Session.AUTO_ACKNOWLEDGESOSProducer(jmsServerUri);
        } catch (JMSException e) {
            LOGGER.errorinfo("JMSExceptionmessage occurred whilesend trying to create Sessionqueue: ", e);
        }
        return session;
    }

LOGGER.info(requestBody);
    private Destination createDestination(Session session) {
        return thisproducer.createDestination(session, DEFAULT_QUEUE_NAMEwrite(requestBody, queueName, queueTtl);
      }
    private Destination createDestination(Session session, String queueName} else if ("consume".equals(args[0])) {
        Destination destination = null;
     SOSConsumer consumer = try {
new SOSConsumer(jmsServerUri);
                String destinationconsumedMessage = session.createQueue(queueName)null;
                }consumedMessage catch= (JMSException e) {
consumer.read(queueName);
                LOGGER.errorinfo("JMSExceptionmessage occurredreceived while trying to create Destination: ", efrom queue:");
        }
        return destinationLOGGER.info(consumedMessage);
    }

    private MessageConsumer createMessageConsumer(Session session, Destination destination) {
  if      MessageConsumer consumer (consumedMessage != null;) {
        try {
            consumerclient = session.createConsumer(destinationsetupHttpClient(username, pwd);
        } catch (JMSException e) {
         URI jocUri = LOGGERURI.error("JMSException occurred while trying to create MessageConsumer: ", e);
create(jocApiUri);
                }
    LOGGER.info("send login to: " return consumer+ jocUri.resolve(API_LOGIN).toString());
    }

     private String read() {
        TextMessageString messageresponse = client.postRestService(jocUri.resolve(API_LOGIN), null);
           String textMessage = null;
      LOGGER.info("HTTP status Connectioncode: connection" =+ createConnectionclient.statusCode());
        try {
            Session session = createSession(connection);if (client.statusCode() == 200) {
            Destination destination = createDestination(session);
         JsonReader jsonReader = connection.start()null;
            MessageConsumer consumer = createMessageConsumer(session, destination);
        String accessToken   while (true) {
    = null;
            Message receivedMessage = consumer.receive(1);
         try {
      if (receivedMessage != null) {
                  jsonReader = ifJson.createReader(new (receivedMessage instanceof TextMessage) {
StringReader(response));
                            JsonObject messagejson = jsonReader.readObject(TextMessage) receivedMessage;
                            textMessageaccessToken = messagejson.getText(getString("accessToken", "");
                        LOGGER.info("Reading message: " + textMessage);} catch (Exception e) {
                        break;
    throw new Exception("Could not determine   accessToken.", e);
         } else {
             } finally {
         break;
                    }
 jsonReader.close();
               }
            }
        } catch (JMSException e) {
            LOGGERclient.error("JMSException occurred while trying to read from Destination: ", eaddHeader(ACCESS_TOKEN_HEADER, accessToken);
         } finally {
            if (connection != null) {
        client.addHeader(CONTENT_TYPE, APPLICATION_JSON);
        try {
               LOGGER.info("REQUEST: "    connection.close(+ API_ADD_ORDER);
                } catch (JMSException e) {
    LOGGER.info("PARAMS: "    + consumedMessage);
           LOGGER.error("JMSException occurred while trying to close the connection: " , e);
   String apiUrl = null;
          }
            }
     if (!API_ADD_ORDER.toLowerCase().startsWith(jocApiUri)) {
    }
        return textMessage;
    }

    public String receiveFromQueue() {
    apiUrl = jocApiUri  String message = null;
+ API_ADD_ORDER;
           try {
            connect();}
             message = read();
         LOGGER.info("resolvedUri: "  sendRequest(message+ jocUri.resolve(apiUrl).toString());
             disconnect();
           }response catch (Exception e) {= client.postRestService(jocUri.resolve(apiUrl), consumedMessage);
            LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:LOGGER.info("HTTP status code: " + PORT, eclient.statusCode());
        }
        return message;
    }

    publicresponse void connect(String host, int port) throws Exception {
        if (host == null || host.length() == 0) {
= client.postRestService(jocUri.resolve(API_LOGOUT), null);
                       throw (new Exception("hostname missing."LOGGER.info("HTTP status code: " + client.statusCode());
         }
        if (port == 0){}
             throw (new Exception("port missing."));
 }
            }
        if ("udp".equalsIgnoreCase(PROTOCOL)} catch (Throwable e) {
            udpSocket = new DatagramSockete.printStackTrace();
            udpSocket.connect(InetAddress.getByName(HOST), PORTSystem.exit(1);
        } elsefinally {
            socketif (client != new Socket(host, port);
null) {
             in = new DataInputStream(socketclient.getInputStreamcloseHttpClient());
            out = new PrintWriter(socket.getOutputStream(), true);}
        }
    }

    publicprivate static voidSOSRestApiClient connectsetupHttpClient()String throwsusername, Exception {
        this.connect(HOST, PORT);
    }

    /** sends a request to a JobScheduler
     *String password) {
        SOSRestApiClient client = new SOSRestApiClient();
     * @param command
 String basicAuth = Base64.getMimeEncoder().encodeToString((username + *":" @throws+ java.lang.Exception 
password).getBytes());
        **/client.setBasicAuthorization(basicAuth);
    public  void sendRequest(String command) throws Exception {
  return client;
    }

    private static if ("udp".equalsIgnoreCase(PROTOCOL))String cleanupValue(String value) {
        value = value.trim();
         if (commandvalue.indexOfstartsWith("\"<?xml") == -1) {
                commandvalue = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
            }
value.substring(1);
        }
      byte[] commandBytes =if command(value.getBytesendsWith();
"\"")) {
            value udpSocket= value.send(new DatagramPacket(commandBytessubstring(0, commandBytesvalue.length, InetAddress.getByName(HOST), - PORT)1);
        } else {
        return value;
   if (command.indexOf("<?xml") == 0) {
       }

    private static void readPropertiesFile(Path path) {
        Properties props out.print(command + "\r\n"= new Properties();
        try {
   } else {
       props.load(Files.newInputStream(path));
         out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + commandjmsServerUri + "\r\n"= cleanupValue(props.getProperty("jms_url"));
            LOGGER.info("cfg jms_url: " + jmsServerUri);
  }
          queueName = outcleanupValue(props.flush(getProperty("jms_queue_name"));
         }
    }
   LOGGER.info("cfg jms_queue_name: " + queueName);
    /** receives the response of the JobScheduler for thequeueTtl last sent request
= Long.parseLong(cleanupValue(props.getProperty("jms_queue_name"))); 
       *
     * @return String response from JobSchedulerLOGGER.info("cfg jms_queue_ttl: " + queueTtl.toString());
     * @throws IOException */
    publicjocApiUri String getResponse() throws IOException, RuntimeException {
= cleanupValue(props.getProperty("joc_api_url"));
             int sec = 0LOGGER.info("cfg joc_api_url: " + jocApiUri);
        byte[] buffer = {};
   controllerId =    while (in.available() == 0) {
cleanupValue(props.getProperty("controller_id"));
            LOGGER.info("cfg controller_id: " try+ {controllerId);
            workflowPath    Thread.sleep(1000= cleanupValue(props.getProperty("workflow_path"));
             } catch (InterruptedException e) {LOGGER.info("cfg workflow_path: " + workflowPath);
            username    LOGGER.error= cleanupValue(props.getProperty("username", e));
            pwd   }= cleanupValue(props.getProperty("password"));
            ifrequestBody (sec++ == TIMEOUT) {
      "{\"controllerId\":\"" + controllerId + "\",\"orders\":[{\"workflowPath\":\"" + workflowPath
          throw new RuntimeException("timeout reached");
            }+ "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
        }
 catch (IOException e) {
    buffer = new byte[in.available()];
     LOGGER.warn("could not read properties intfile, bytesReaduse =defaults ininstead.read(buffer");
        return new String(buffer, "ISO-8859-1");
  jmsServerUri  }
= DEFAULT_JMS_URI;
    public void disconnect() throws Exception {
   queueName     if (socket != null) {
= DEFAULT_QUEUE_NAME;
            jocApiUri  socket.close()= DEFAULT_JOC_API_URL;
        }
    requestBody = DEFAULT_JOC_API_REQUEST_BODY;
  if (in != null) {
      username      in.close()= DEFAULT_USERNAME;
        }
    pwd = DEFAULT_PWD;
  if (out != null) {
      queueTtl =     out.close();5000L; 
        }
    }

}


Maven Configuration Example

This example shows the dependency needed dependencies required to build the example above as a Maven project.

Code Block
languagexml
titleMaven Configuration
collapsetrue
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>


	<groupId>com.sos-berlin</groupId>
	<artifactId>activeMQ-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>


	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.1315.0</version>
		</dependency>
	</dependencies>
</project>

...