Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titlecreateConnection(String uri)
linenumberstrue
collapsetrue
public Connection createConnection(String uri){
    ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
    Connection jmsConnection = null;
        try {
            jmsConnection = factory.createConnection();
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to connect: ");
            throw e;
        }
        return jmsConnection;
}

...

Code Block
languagejava
titlecomplete source Complete Source of the MessageProducerExampleJob
linenumberstrue
collapsetrue
package com.sos.jitl.messaging;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import sos.spooler.Job_impl;
import sos.spooler.Variable_set;


public class MessageProducerExampleJob extends Job_impl {


    private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
    private static final String DEFAULT_PROTOCOL = "tcp";


    @Override
    public boolean spooler_process() throws Exception {
        try {
            execute();
        } catch (Exception e) {
            spooler_log.error("Error occured in spooler_process() of MessageProducerExampleJob: ");
            throw e;
        }
        return  spooler_job.order_queue() != null;
    }


    private void execute() throws Exception {
        Variable_set params = spooler_task.params();
        params.merge(spooler_task.order().params());
        spooler_log.debug9(params.xml());
        String protocol = spooler_task.params().value("MQ_Protocol");
        spooler_log.debug9("Received protocol: " + protocol);
        String messageHost = spooler_task.params().value("MQ_Host");
        spooler_log.debug9("Received MQ Host: " + messageHost);
        String messagePort = spooler_task.params().value("MQ_Port");
        spooler_log.debug9("Received MQ port: " + messagePort);
        String queueName = spooler_task.params().value("MQ_QueueName");
        spooler_log.debug9("Received Queue name: " + queueName);
        String message = spooler_task.params().value("message");
        spooler_log.debug9("Received message: " + message);
        if(protocol == null || (protocol != null && protocol.isEmpty())){
            protocol = DEFAULT_PROTOCOL;
        }
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            queueName = DEFAULT_QUEUE_NAME;
        }
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        if(message != null && !message.isEmpty()){
            write(message, connectionUrl, queueName);
        } else {
            spooler_log.error("Message is empty, nothing to send to message server");
        }
    }


    private String createConnectionUrl (String protocol, String hostName, String port){
        StringBuilder strb = new StringBuilder();
        strb.append(protocol).append("://").append(hostName).append(":").append(port);
        return strb.toString();
    }


    private Connection createConnection(String uri) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection jmsConnection = null;
        try {
            jmsConnection = factory.createConnection();
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to connect: ");
            throw e;
        }
        return jmsConnection;
    }
    
    private Session createSession(Connection connection) throws JMSException{
        Session session = null;
        try {
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Session: ");
            throw e;
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName) throws JMSException{
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Destination: ");
            throw e;
        }
        return destination;
    }
    
    public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
        MessageProducer producer = null;
        try {
            producer = session.createProducer(destination);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
            throw e;
        }
        return producer;
    }
    
    public void write(String text, String connectionUrl, String queueName) throws JMSException{
        Connection connection = createConnection(connectionUrl);
        Session session = createSession(connection);
        Destination destination = createDestination(session, queueName);
        MessageProducer producer = createMessageProducer(session, destination);
        Message message = null;
        try {
            message = session.createTextMessage(text);
            producer.send(message);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
            throw e;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
                    throw e;
                }
            }
        }
    }
    
}

...

As seen above in the Producer Job example, there are also three more additional methods needed for the consumer Java job to run.

  • createMessageConsumer(Session session, Destination destination)
  • read(Connection connection, String queueName, Boolean closeMessage)
  • executeXml(String message)
  • executeXmlForAllTargets(String message)
  • execute()

The createMessageConsumer(Session session, Destination destination) method

...

The method is called with an already instantiated Connection object the name of the queue to read from and a flag to determine if this is the last consumer. If it is the last consumer for the messages in the specified queue then the reiceived message will be acknowledged. This method makes use of the methods described above. It instantiates all objects needed for the connection and reads a Message object from the given queue. It extracts the value from the Message object as a string representation via the Message object´s getText() method. We have to make sure that connection.start() is called before the MessageConsumer object is instantiated.

Code Block
languagejava
titleread(Connection connection, String queueName, Boolean closeMessage)
linenumberstrue
collapsetrue
private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
    String messageText = null;
    try {
        Session session = createSession(connection);
        Destination destination = createDestination(session, queueName);
        connection.start();
        MessageConsumer consumer = createMessageConsumer(session, destination);
        Message receivedMessage = null;
        while (true) {
            receivedMessage = consumer.receive(1);
            if (receivedMessage != null) {
                if (receivedMessage instanceof TextMessage) {
                    TextMessage message = (TextMessage) receivedMessage;
                    messageText = message.getText();
                    spooler_log.debug9("Reading message from queue: " + messageText);
                    break;
                } else {
                    break;
                }
            }
        }
        if(closeMessage){
            receivedMessage.acknowledge();
        }
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to read from Destination: ");
        throw e;
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to close the connection: ");
            }
        }
    }
    return messageText;
}

...

The executeXml(String message) method

This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method, calls the read(..) method and subsequently execute the XML.

 

...

gets the received message and sends it through the spooler.executeXml(String text) API method with additional debug information.

Code Block
languagejava
titleexecuteXml(String message)
linenumberstrue
collapsetrue
private void executeXml(String message) {
    spooler_log.debug9("execute XML started");
    String answer = spooler.execute_xml(message);
    spooler_log.debug9("Return value of executeXML: " + answer);
    spooler_log.debug9("order sent");
}

The executeXmlForAllTargets(String message) method

This method calls the executeXml(String message) method mentioned above for each given target job chain. It replaces the jobChain argument of the (XML) message with the given job chain names form the job parameter.

Code Block
languagejava
titleexecuteXmlForAllTargets(String message)
linenumberstrue
collapsetrue
private void executeXmlForAllTargets(String message) {
    if (targetJobChains.contains(DELIMITER) && message.contains("add_order")) {
        String [] jobChainNames = targetJobChains.split("[" + DELIMITER + "]");
        for (String name : jobChainNames){
            spooler_log.debug9("add_order XML will be adjusted for JobChain: " + name);
            executeXml(message.replaceFirst("job_chain='[^']*'", "job_chain='" + name + "'"));
        }            
    }else{
        executeXml(message);
    }
}

The execute() method

This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method and subsequently calls the read(..) method. The received message as well as the name(s) of the target job chain(s) are store globally in the class for further processing.

Code Block
languagejava
titleexecute()
linenumberstrue
collapsetrue
private void execute() throws Exception {
    Variable_set params = spooler_task.params();
    params.merge(spooler_task.order().params());
    spooler_log.debug9(params.xml());
    String protocol = params.value("MQ_Protocol");
    spooler_log.debug9("Received protocol: " + protocol);
    if(protocol == null || (protocol != null && protocol.isEmpty())){
        protocol = DEFAULT_PROTOCOL;
    }
    String messageHost = params.value("MQ_Host");
    spooler_log.debug9("Received MQ Host: " + messageHost);
    String messagePort = params.value("MQ_Port");
    spooler_log.debug9("Received MQ port: " + messagePort);
    String queueName = params.value("MQ_QueueName");
    spooler_log.debug9("Received Queue name: " + queueName);
    Boolean lastConsumer = Boolean.valueOf(params.value("lastConsumer"));
    spooler_log.debug9("Received lastConsumer: " + lastConsumer.toString());
    if(queueName == null || (queueName != null && queueName.isEmpty())){
        queueName = DEFAULT_QUEUE_NAME;
    }
    targetJobChains = params.value("targetJobChainName");
    spooler_log.debug9("Received targetJobChains: " + targetJobChains);
    String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
    Connection jmsConnection = createConnection(connectionUrl);
    messageXml = read(jmsConnection, queueName, lastConsumer);
    spooler_log.debug9("Received message: " + messageXml);
}

Putting the Bricks Together

The last thing to do is for the Consumer Job is to call the execute() method as well as the executeXmlForAllTargets(String message) method in spooler_process() as shown in the complete code example below.

Code Block
languagejava
titleComplete Source of the MessageConsumerExampleJob
linenumberstrue
collapsetrue
package com.sos.jitl.messaging;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


import sos.spooler.Job_impl;
import sos.spooler.Variable_set;

public class MessageConsumerExampleJob extends Job_impl {


    private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
    private static final String DEFAULT_PROTOCOL = "tcp";
    private static final String DELIMITER = ",";
    private String messageXml; 
    private String targetJobChains;


    @Override
    public boolean spooler_process() throws Exception {
        try {
            execute();
            executeXmlForAllTargets(messageXml);
        } catch (Exception e) {
            spooler_log.error("Error occured in spooler_process() of MessageConsumerTestJob: ");
            throw e;
        }
        return spooler_job.order_queue() != null;
    }


    private void execute() throws Exception {
        Variable_set params = spooler_task.params();
        params.merge(spooler_task.order().params());
        spooler_log.debug9(params.xml());
        String protocol = params.value("MQ_Protocol");
        spooler_log.debug9("Received protocol: " + protocol);
        if(protocol == null || (protocol != null && protocol.isEmpty())){
            protocol = DEFAULT_PROTOCOL;
        }
        String messageHost = params.value("MQ_Host");
        spooler_log.debug9("Received MQ Host: " + messageHost);
        String messagePort = params.value("MQ_Port");
        spooler_log.debug9("Received MQ port: " + messagePort);
        String queueName = params.value("MQ_QueueName");
        spooler_log.debug9("Received Queue name: " + queueName);
        Boolean lastConsumer = Boolean.valueOf(params.value("lastConsumer"));
        spooler_log.debug9("Received lastConsumer: " + lastConsumer.toString());
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            queueName = DEFAULT_QUEUE_NAME;
        }
        targetJobChains = params.value("targetJobChainName");
        spooler_log.debug9("Received targetJobChains: " + targetJobChains);
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        Connection jmsConnection = createConnection(connectionUrl);
        messageXml = read(jmsConnection, queueName, lastConsumer);
        spooler_log.debug9("Received message: " + messageXml);
    }
    
    private String createConnectionUrl (String protocol, String hostName, String port){
        StringBuilder strb = new StringBuilder();
        strb.append(protocol).append("://").append(hostName).append(":").append(port);
        return strb.toString();
    }


    private Connection createConnection(String uri) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection jmsConnection = null;
        try {
            jmsConnection = factory.createConnection();
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to connect: ");
            throw e;
        }
        return jmsConnection;
    }
    
    private Session createSession(Connection connection) throws JMSException{
        Session session = null;
        try {
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Session: ");
            throw e;
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName) throws JMSException{
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Destination: ");
            throw e;
        }
        return destination;
    }
    
    private MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
        MessageConsumer consumer = null;
        try {
            consumer = session.createConsumer(destination);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageConsumer: ");
            throw e;
        }
        return consumer;
    }


    private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
        String messageText = null;
        try {
            Session session = createSession(connection);
            Destination destination = createDestination(session, queueName);
            connection.start();
            MessageConsumer consumer = createMessageConsumer(session, destination);
            Message receivedMessage = null;
            while (true) {
                receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        TextMessage message = (TextMessage) receivedMessage;
                        messageText = message.getText();
                        spooler_log.debug9("Reading message from queue: " + messageText);
                        break;
                    } else {
                        break;
                    }
                }
            }
            if(closeMessage){
                receivedMessage.acknowledge();
            }
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to read from Destination: ");
            throw e;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred while trying to close the connection: ");
                }
            }
         }
        return messageText;
    }


    private void executeXml(String message) {
        spooler_log.debug9("execute XML started");
        String answer = spooler.execute_xml(message);
        spooler_log.debug9("Return value of executeXML: " + answer);
        spooler_log.debug9("order sent");
    }
    
    private void executeXmlForAllTargets(String message) {
        if (targetJobChains.contains(DELIMITER) && message.contains("add_order")) {
            String [] jobChainNames = targetJobChains.split("[" + DELIMITER + "]");
            for (String name : jobChainNames){
                spooler_log.debug9("add_order XML will be adjusted for JobChain: " + name);
                executeXml(message.replaceFirst("job_chain='[^']*'", "job_chain='" + name + "'"));
            }            
        }else{
            executeXml(message);
        }
    }
}