Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The document explains which classes and methods have to be created and for which purpose. This example was developed with the use of Apache MQ.

...

For the coverage of the JMS api API refer to the Oracle Java Documentation.

...

  1. Run a Java Job in the JobScheduler which sends an XML fragment to a message queue (MQ) 
  2. Run a Java Job in the JobScheduler which receives an XML fragment from an MQ
  3. Execute an XML command in the JobScheduler

What

...

does this example explain?

The steps described in this article are as follows:

...

  • Either download the jar file and add it to the classpath
  • or in case of a maven project add the following dependency to the project configuration.
Code Block
languagexml
titleMaven dependency for activemq-all
collapsetrue
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.13.0</version>
</dependency>

...

Make sure to throw the catched exception in order to hand it over to the spooler spooler_process() method. This is needed for a job running in a job chain to determine that it has to fail in case an error occurs.

...

Both Java jobs need some methods to initialize a connection to an MQ Server. This section of the document shows how to implement them. The implementation is based on the JMS implementation used and shipped by activeMQActiveMQ.

To keep it simple for this example these methods are put in both Java job classes. To prevent duplicates in a real project´s source code, it is better to put these methods in one class and extend the class in the two job classes.

...

Code Block
languagejava
titleComplete Source of the MessageProducerExampleJob
linenumberstrue
collapsetrue
package com.sos.jitl.messaging;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import sos.spooler.Job_impl;
import sos.spooler.Variable_set;


public class MessageProducerExampleJob extends Job_impl {


    private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
    private static final String DEFAULT_PROTOCOL = "tcp";


    @Override
    public boolean spooler_process() throws Exception {
        try {
            execute();
        } catch (Exception e) {
            spooler_log.error("Error occured in spooler_process() of MessageProducerExampleJob: ");
            throw e;
        }
        return  spooler_job.order_queue() != null;
    }


    private void execute() throws Exception {
        Variable_set params = spooler_task.params();
        params.merge(spooler_task.order().params());
        spooler_log.debug9(params.xml());
        String protocol = spooler_task.params().value("MQ_Protocol");
        spooler_log.debug9("Received protocol: " + protocol);
        String messageHost = spooler_task.params().value("MQ_Host");
        spooler_log.debug9("Received MQ Host: " + messageHost);
        String messagePort = spooler_task.params().value("MQ_Port");
        spooler_log.debug9("Received MQ port: " + messagePort);
        String queueName = spooler_task.params().value("MQ_QueueName");
        spooler_log.debug9("Received Queue name: " + queueName);
        String message = spooler_task.params().value("message");
        spooler_log.debug9("Received message: " + message);
        if(protocol == null || (protocol != null && protocol.isEmpty())){
            protocol = DEFAULT_PROTOCOL;
        }
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            queueName = DEFAULT_QUEUE_NAME;
        }
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        if(message != null && !message.isEmpty()){
            write(message, connectionUrl, queueName);
        } else {
            spooler_log.error("Message is empty, nothing to send to message server");
        }
    }


    private String createConnectionUrl (String protocol, String hostName, String port){
        StringBuilder strb = new StringBuilder();
        strb.append(protocol).append("://").append(hostName).append(":").append(port);
        return strb.toString();
    }


    private Connection createConnection(String uri) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection jmsConnection = null;
        try {
            jmsConnection = factory.createConnection();
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to connect: ");
            throw e;
        }
        return jmsConnection;
    }
    
    private Session createSession(Connection connection) throws JMSException{
        Session session = null;
        try {
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Session: ");
            throw e;
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName) throws JMSException{
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Destination: ");
            throw e;
        }
        return destination;
    }
    
    public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
        MessageProducer producer = null;
        try {
            producer = session.createProducer(destination);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
            throw e;
        }
        return producer;
    }
    
    public void write(String text, String connectionUrl, String queueName) throws JMSException{
        Connection connection = createConnection(connectionUrl);
        Session session = createSession(connection);
        Destination destination = createDestination(session, queueName);
        MessageProducer producer = createMessageProducer(session, destination);
        Message message = null;
        try {
            message = session.createTextMessage(text);
            producer.send(message);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
            throw e;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
                    throw e;
                }
            }
        }
    }
    
}

...

Code Block
languagejava
titleread(Connection connection, String queueName, Boolean closeMessage)
linenumberstrue
collapsetrue
private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
    String messageText = null;
    try {
        Session session = createSession(connection);
        Destination destination = createDestination(session, queueName);
        connection.start();
        MessageConsumer consumer = createMessageConsumer(session, destination);
        Message receivedMessage = null;
        while (true) {
            receivedMessage = consumer.receive(1);
            if (receivedMessage != null) {
                if (receivedMessage instanceof TextMessage) {
                    TextMessage message = (TextMessage) receivedMessage;
                    messageText = message.getText();
                    spooler_log.debug9("Reading message from queue: " + messageText);
                    break;
                } else {
                    break;
                }
            }
        }
        if(closeMessage){
            receivedMessage.acknowledge();
        }
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to read from Destination: ");
        throw e;
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to close the connection: ");
            }
        }
    }
    return messageText;
}

...

Code Block
languagejava
titleexecuteXml(String message)
linenumberstrue
collapsetrue
private void executeXml(String message) {
    spooler_log.debug9("execute XML started");
    String answer = spooler.execute_xml(message);
    spooler_log.debug9("Return value of executeXML: " + answer);
    spooler_log.debug9("orderexecute XML sentfinished");
}

The executeXmlForAllTargets(String message) method

...

Code Block
languagejava
titleComplete Source of the MessageConsumerExampleJob
linenumberstrue
collapsetrue
package com.sos.jitl.messaging;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


import sos.spooler.Job_impl;
import sos.spooler.Variable_set;

public class MessageConsumerExampleJob extends Job_impl {


    private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
    private static final String DEFAULT_PROTOCOL = "tcp";
    private static final String DELIMITER = ",";
    private String messageXml; 
    private String targetJobChains;


    @Override
    public boolean spooler_process() throws Exception {
        try {
            execute();
            executeXmlForAllTargets(messageXml);
        } catch (Exception e) {
            spooler_log.error("Error occured in spooler_process() of MessageConsumerTestJob: ");
            throw e;
        }
        return spooler_job.order_queue() != null;
    }


    private void execute() throws Exception {
        Variable_set params = spooler_task.params();
        params.merge(spooler_task.order().params());
        spooler_log.debug9(params.xml());
        String protocol = params.value("MQ_Protocol");
        spooler_log.debug9("Received protocol: " + protocol);
        if(protocol == null || (protocol != null && protocol.isEmpty())){
            protocol = DEFAULT_PROTOCOL;
        }
        String messageHost = params.value("MQ_Host");
        spooler_log.debug9("Received MQ Host: " + messageHost);
        String messagePort = params.value("MQ_Port");
        spooler_log.debug9("Received MQ port: " + messagePort);
        String queueName = params.value("MQ_QueueName");
        spooler_log.debug9("Received Queue name: " + queueName);
        Boolean lastConsumer = Boolean.valueOf(params.value("lastConsumer"));
        spooler_log.debug9("Received lastConsumer: " + lastConsumer.toString());
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            queueName = DEFAULT_QUEUE_NAME;
        }
        targetJobChains = params.value("targetJobChainName");
        spooler_log.debug9("Received targetJobChains: " + targetJobChains);
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        Connection jmsConnection = createConnection(connectionUrl);
        messageXml = read(jmsConnection, queueName, lastConsumer);
        spooler_log.debug9("Received message: " + messageXml);
    }
    
    private String createConnectionUrl (String protocol, String hostName, String port){
        StringBuilder strb = new StringBuilder();
        strb.append(protocol).append("://").append(hostName).append(":").append(port);
        return strb.toString();
    }


    private Connection createConnection(String uri) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection jmsConnection = null;
        try {
            jmsConnection = factory.createConnection();
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to connect: ");
            throw e;
        }
        return jmsConnection;
    }
    
    private Session createSession(Connection connection) throws JMSException{
        Session session = null;
        try {
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Session: ");
            throw e;
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName) throws JMSException{
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Destination: ");
            throw e;
        }
        return destination;
    }
    
    private MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
        MessageConsumer consumer = null;
        try {
            consumer = session.createConsumer(destination);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageConsumer: ");
            throw e;
        }
        return consumer;
    }


    private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
        String messageText = null;
        try {
            Session session = createSession(connection);
            Destination destination = createDestination(session, queueName);
            connection.start();
            MessageConsumer consumer = createMessageConsumer(session, destination);
            Message receivedMessage = null;
            while (true) {
                receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        TextMessage message = (TextMessage) receivedMessage;
                        messageText = message.getText();
                        spooler_log.debug9("Reading message from queue: " + messageText);
                        break;
                    } else {
                        break;
                    }
                }
            }
            if(closeMessage){
                receivedMessage.acknowledge();
            }
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to read from Destination: ");
            throw e;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred while trying to close the connection: ");
                }
            }
         }
        return messageText;
    }


    private void executeXml(String message) {
        spooler_log.debug9("execute XML started");
        String answer = spooler.execute_xml(message);
        spooler_log.debug9("Return value of executeXML: " + answer);
        spooler_log.debug9("order sent");
    }
    
    private void executeXmlForAllTargets(String message) {
        if (targetJobChains.contains(DELIMITER) && message.contains("add_order")) {
            String [] jobChainNames = targetJobChains.split("[" + DELIMITER + "]");
            for (String name : jobChainNames){
                spooler_log.debug9("add_order XML will be adjusted for JobChain: " + name);
                executeXml(message.replaceFirst("job_chain='[^']*'", "job_chain='" + name + "'"));
            }            
        }else{
            executeXml(message);
        }
    }
}

Configuring the Jobs in JOE