Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A zip file of this example as a complete maven project implementation is available for download: js7-jms-example-js7-project.zip.

Prerequisites

...

Code Block
languagejava
titlewrite(String text)
linenumberstrue
collapsetrue
    public void write(String text, String queueName, long ttl) throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(QUEUE_NAMEqueueName);
            MessageProducer producer = session.createProducer(destination);
            // 5 sec time to live for the producer for this showcase
            producer.setTimeToLive(ttl);
            Message message = null;
            if(text != null){
                message = session.createTextMessage(text);
            } else{
                message = session.createTextMessage(TEXT);
            }
            producer.send(message);
        } catch (JMSExceptionThrowable e) {
            LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
        } finally {
  throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e);
                }
            }
        }
    }

The SOSProducer class

...

Code Block
languagejava
titleSOSProducer.java
linenumberstrue
collapsetrue
package com.sos.jms.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;


public class SOSProducer {

    private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
    private static final String QUEUE_NAME = "test_queue";
    private static final String TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
    private String uri;
    
    public SOSProducer(String uri) {
        this.uri = uri;
    }
    
    public void write(String text){
, String queueName) throws     Connection connection = null;Exception {
        Session session = nullwrite(text, queueName, 5000L);
    }
    try {
    public void write(String text, String queueName, long ttl) throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(QUEUE_NAMEqueueName);
            MessageProducer producer = session.createProducer(destination);
            Message// message5 =sec null;
time to live for the producer for this showcase
    if(text         producer.setTimeToLive(ttl);
            Message message = null;
            if(text != null){
                message = session.createTextMessage(text);
            } else{
                message = session.createTextMessage(TEXT);
            }
            producer.send(message);
        } catch (JMSExceptionThrowable e) {
            LOGGER.error("JMSException occurred while trying to write Message to Destination: " ,);
            throw e);
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e);
                }
            }
        }
    }
    
}

The MessageConsumer

...

Code Block
languagejava
titleread()
linenumberstrue
collapsetrue
    public String read(String queueName) throws Exception {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(DEFAULT_QUEUE_NAMEqueueName);
            connection.start();
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (JMSExceptionThrowable e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
            throw e;
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e);
                }
            }
         }
        return textMessage;
    }

...

Code Block
languagejava
titleSOSConsumer
linenumberstrue
collapsetrue
package com.sos.jms.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SOSConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
    private static final String DEFAULT_QUEUE_NAME = "test_queue";
    private String uri;
    
    public SOSConsumer (String uri) {
        this.uri = uri;
    }
    
    public String read(String queueName) throws Exception {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = null;
        Session session = null;
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(DEFAULT_QUEUE_NAMEqueueName);
            connection.start();
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (JMSExceptionThrowable e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ",);
            throw e);
        } finally {
            if(session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e);
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e);
                }
            }
         }
        return textMessage;
    }

}

...

Code Block
languagejava
collapsetrue
package com.sos.jms;

import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Properties;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sos.commons.httpclient.SOSRestApiClient;
import com.sos.jms.consumer.SOSConsumer;
import com.sos.jms.producer.SOSProducer;

public class JmsExecute {

    private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616";
    private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/";
    private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\"" 
            + ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
    private static final String DEFAULT_USERNAME = "root";
    private static final String DEFAULT_PWD = "root";
    private static final String APIDEFAULT_ADD_QUEUE_NAME = "test_queue";
    private static final String API_ADD_ORDER = "orders/add";
    private static final String API_LOGIN = "authentication/login";
    private static final String API_LOGOUT = "authentication/logout";
    private static final String ACCESS_TOKEN_HEADER = "X-Access-Token";
    private static final String APPLICATION_JSON = "application/json";
    private static final String CONTENT_TYPE = "Content-Type";
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class);
    private static String jmsServerUri = null;
    private static String jocApiUri = null;
    private static String controllerId = null;
    private static String workflowPath = null;
    private static String requestBody = null;
    private static String username = null;
    private static String pwd = null;
    private static String queueName = null;
    private static Long queueTtl = null;

    public static void main(String[] args) throws URISyntaxException {
        for (int iSOSRestApiClient client = 0null;
  i    < args.length; i++) try {
            LOGGER.info("args["+ i +"] = " + args[i]);
        }
        URL classUrl = JmsExecute.classURL classUrl = JmsExecute.class.getProtectionDomain().getCodeSource().getLocation();
            Path classPath = Paths.get(classUrl.toURI());
            String filename = classPath.getFileName().toString().replace(".jar", ".config");
            LOGGER.info(classPath.getParent().resolve(filename).toString());
            readPropertiesFile(classPath.getParent().resolve(filename));
            if ("produce".equals(args[0])) {
                SOSProducer producer = new SOSProducer(jmsServerUri);
                LOGGER.info("message send to queue:");
                LOGGER.info(requestBody);
                producer.write(requestBody, queueName, queueTtl);
            } else if ("consume".equals(args[0])) {
                SOSConsumer consumer = new SOSConsumer(jmsServerUri);
                String consumedMessage = null;
                consumedMessage = consumer.read(queueName);
                LOGGER.info("message received from queue:");
                LOGGER.info(consumedMessage);
              SOSRestApiClient client =if setupHttpClient(username, pwd);
(consumedMessage != null) {
                    try {client = setupHttpClient(username, pwd);
                    URI jocUri = URI.create(jocApiUri);
                    LOGGER.info("send login to: " + jocUri.resolve(API_LOGIN).toString());
                    String response = client.postRestService(jocUri.resolve(API_LOGIN), null);
                    LOGGER.info("HTTP status code: " + client.statusCode());
                    if (client.statusCode() == 200) {
                        JsonReader jsonReader = null;
                        String accessToken = null;
                        try {
                            jsonReader = Json.createReader(new StringReader(response));
                            JsonObject json = jsonReader.readObject();
                            accessToken = json.getString("accessToken", "");
                        } catch (Exception e) {
                            throw new LOGGER.warnException("Could not determine accessToken.", e);
                        } finally {
                            jsonReader.close();
                        }
                        client.addHeader(ACCESS_TOKEN_HEADER, accessToken);
                        client.addHeader(CONTENT_TYPE, APPLICATION_JSON);
                        LOGGER.info("REQUEST: " + API_ADD_ORDER);
                        LOGGER.info("PARAMS: " + consumedMessage);
                        String apiUrl = null;
                        if (!API_ADD_ORDER.toLowerCase().startsWith(jocApiUri)) {
                            apiUrl = jocApiUri + API_ADD_ORDER;
                    }
    }
                        LOGGER.info("resolvedUri: " + jocUri.resolve(apiUrl).toString());
                        response = client.postRestService(jocUri.resolve(apiUrl), consumedMessage);
                        LOGGER.info("HTTP status code: " + client.statusCode());
                        response = client.postRestService(jocUri.resolve(API_LOGOUT), null);
                        LOGGER.info("HTTP status code: " + client.statusCode());
                }
    }
        } catch (Exception e) {
    }
            LOGGER.error(e.getMessage(), e);}
        } catch   } finally(Throwable e) {
                client.closeHttpCliente.printStackTrace();
            }System.exit(1);
        }
 finally {
  }
    
    private static SOSRestApiClientif setupHttpClient(String client != null) {
                client.closeHttpClient();
            }
        }
    }

    private static SOSRestApiClient setupHttpClient(String username, String password) {
        SOSRestApiClient client = new SOSRestApiClient();
        String basicAuth = Base64.getMimeEncoder().encodeToString((username + ":" + password).getBytes());
        client.setBasicAuthorization(basicAuth);
        return client;
    }
    
    private static String cleanupValue(String value) {
        if (value.startsWith("\"")) {
            value = value.substring(1);
        }
        if (value.endsWith("\"")) {
            value = value.substring(0, value.length() - 1);
        }
        return value;
    }
    
    private static void readPropertiesFile (Path path) {
        Properties props = new Properties();
        try {
            props.load(Files.newInputStream(path));
            jmsServerUri = cleanupValue(props.getProperty("jms_url"));
            LOGGER.info("cfg jms_url: " + jmsServerUri);
            queueName = cleanupValue(props.getProperty("jms_queue_name"));
            propsLOGGER.load(Files.newInputStream(path)info("cfg jms_queue_name: " + queueName);
            jmsServerUriqueueTtl = Long.parseLong(cleanupValue(props.getProperty("jms_queue_urlname"))); 
            LOGGER.info("cfg jms_queue_urlttl: " + jmsServerUriqueueTtl.toString());
            jocApiUri = cleanupValue(props.getProperty("joc_api_url"));
            LOGGER.info("cfg joc_api_url: " + jocApiUri);
            controllerId = cleanupValue(props.getProperty("controller_id"));
            LOGGER.info("cfg controller_id: " + controllerId);
            workflowPath = cleanupValue(props.getProperty("workflow_path"));
            LOGGER.info("cfg workflow_path: " + workflowPath);
            username = cleanupValue(props.getProperty("username"));
            pwd = cleanupValue(props.getProperty("password"));
            requestBody = "{\"controllerId\":\"" + controllerId + "\",\"orders\":[{\"workflowPath\":\"" + workflowPath
                    workflowPath + "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
        } catch (IOException e) {
            LOGGER.warn("could not read properties file, use defaults instead.");
            jmsServerUri = DEFAULT_JMS_URI;
            queueName = DEFAULT_QUEUE_NAME;
            jocApiUri = DEFAULT_JOC_API_URL;
            requestBody = DEFAULT_JOC_API_REQUEST_BODY;
            username = DEFAULT_USERNAME;
            pwd = DEFAULT_PWD;
            queueTtl = 5000L; 
        }
    }

}


Maven Configuration Example

...