Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Both Java jobs need some methods to initialize a connection to an MQ Server. This section of the document shows how to implement them. The implementation is based on the JMS implementation used and shipped by activeMQ.

To keep it simple for this example these methods are put in both Java job classes. To prevent duplicates in a real project´s source code, it is better to put these methods in one class and extend the class in the two job classes.

The createConnection() method

...

Code Block
languagejava
titlecreateSession(Connection connection)
linenumberstrue
collapsetrue
private Session createSession(Connection connection) throws JMSException{
    Session session = null;
    try {
        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to create Session: ");
        throw e;
    }
    return session;
}

 

 

Implementation of the Producer Job

 

Implementation of the Consumer Job

 

 

...

The createDestination(Session session, String queueName) method

This method creates a Destination object. It is called with an active Session object and the name of the queue to write to. The Destination object is initiated through the createQueue(String name) method of the Session object.

Code Block
languagejava
titlecreateDestination(Session session, String queueName)
linenumberstrue
collapsetrue
private Destination createDestination(Session session, String queueName) throws JMSException{
    Destination destination = null;
    try {
        destination = session.createQueue(queueName);
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to create Destination: ");
        throw e;
    }
    return destination;
}

The createConnectionUrl (String protocol, String hostName, String port) method

This is a helper method which creates a connection URI based on the job parameters. It simply uses a StringBuilder to create a string in the format [protocol]://[hostname]:[port].

Code Block
languagejava
titlecreateConnectionUrl (String protocol, String hostName, String port)
linenumberstrue
collapsetrue
private String createConnectionUrl (String protocol, String hostName, String port){
    StringBuilder strb = new StringBuilder();
    strb.append(protocol).append("://").append(hostName).append(":").append(port);
    return strb.toString();
}

Implementation of the Producer Job

There are three more methods needed for the producer Java job to run.

  • createMessageProducer(Session session, Destination destination)
  • write(String text, String connectionUrl, String queueName)
  • execute()

The createMessageProducer(Session session, Destination destination) method

 This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageProducer object with the given session and destination.

Code Block
languagejava
titlecreateMessageProducer(Session session, Destination destination)
linenumberstrue
collapsetrue
public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
    MessageProducer producer = null;
    try {
        producer = session.createProducer(destination);
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
        throw e;
    }
    return producer;
}

The write(String text, String connectionUrl, String queueName) method

The method is called with the message to send to the server and the MessageProducer object to use for publishing. The message is a String object. The method instantiates a Message object with the text to send. This method makes use of the methods described above. It instantiates all objects needed for the connection and sends a Message object. Also the methods throws an error if one occurs it still calls the MessageProducer´s send(Message message) in a try..catch..finally block. That is because the use of the finally block to close the connection in the end.

Code Block
languagejava
titlewrite(String text, String connectionUrl, String queueName)
linenumberstrue
collapsetrue
public void write(String text, String connectionUrl, String queueName) throws JMSException{
    Connection connection = createConnection(connectionUrl);
    Session session = createSession(connection);
    Destination destination = createDestination(session, queueName);
    MessageProducer producer = createMessageProducer(session, destination);
    Message message = null;
    try {
        message = session.createTextMessage(text);
        producer.send(message);
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
        throw e;
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
                throw e;
            }
        }
    }
}

The execute() method

This is the core method of the Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method  and subsequently calls the write(..) method to send the message to the MQ server.

Code Block
languagejava
titleexecute()
linenumberstrue
collapsetrue
private void execute() throws Exception {
    Variable_set params = spooler_task.params();
    params.merge(spooler_task.order().params());
    spooler_log.debug9(params.xml());
    String protocol = spooler_task.params().value("MQ_Protocol");
    spooler_log.debug9("Received protocol: " + protocol);
    String messageHost = spooler_task.params().value("MQ_Host");
    spooler_log.debug9("Received MQ Host: " + messageHost);
    String messagePort = spooler_task.params().value("MQ_Port");
    spooler_log.debug9("Received MQ port: " + messagePort);
    String queueName = spooler_task.params().value("MQ_QueueName");
    spooler_log.debug9("Received Queue name: " + queueName);
    String message = spooler_task.params().value("message");
    spooler_log.debug9("Received message: " + message);
    if(protocol == null || (protocol != null && protocol.isEmpty())){
        protocol = DEFAULT_PROTOCOL;
    }
    if(queueName == null || (queueName != null && queueName.isEmpty())){
        queueName = DEFAULT_QUEUE_NAME;
    }
    String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
    if(message != null && !message.isEmpty()){
        write(message, connectionUrl, queueName);
    } else {
        spooler_log.error("Message is empty, nothing to send to message server");
    }
}

Putting the bricks together

The last thing to do is to call the execute() method in spooler_process() as shown in the complete code example below.

Code Block
languagejava
titlecomplete source of the MessageProducerExampleJob
linenumberstrue
collapsetrue
package com.sos.jitl.messaging;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import sos.spooler.Job_impl;
import sos.spooler.Variable_set;


public class MessageProducerExampleJob extends Job_impl {


    private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
    private static final String DEFAULT_PROTOCOL = "tcp";


    @Override
    public boolean spooler_process() throws Exception {
        try {
            execute();
        } catch (Exception e) {
            spooler_log.error("Error occured in spooler_process() of MessageProducerExampleJob: ");
            throw e;
        }
        return  spooler_job.order_queue() != null;
    }


    private void execute() throws Exception {
        Variable_set params = spooler_task.params();
        params.merge(spooler_task.order().params());
        spooler_log.debug9(params.xml());
        String protocol = spooler_task.params().value("MQ_Protocol");
        spooler_log.debug9("Received protocol: " + protocol);
        String messageHost = spooler_task.params().value("MQ_Host");
        spooler_log.debug9("Received MQ Host: " + messageHost);
        String messagePort = spooler_task.params().value("MQ_Port");
        spooler_log.debug9("Received MQ port: " + messagePort);
        String queueName = spooler_task.params().value("MQ_QueueName");
        spooler_log.debug9("Received Queue name: " + queueName);
        String message = spooler_task.params().value("message");
        spooler_log.debug9("Received message: " + message);
        if(protocol == null || (protocol != null && protocol.isEmpty())){
            protocol = DEFAULT_PROTOCOL;
        }
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            queueName = DEFAULT_QUEUE_NAME;
        }
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        if(message != null && !message.isEmpty()){
            write(message, connectionUrl, queueName);
        } else {
            spooler_log.error("Message is empty, nothing to send to message server");
        }
    }


    private String createConnectionUrl (String protocol, String hostName, String port){
        StringBuilder strb = new StringBuilder();
        strb.append(protocol).append("://").append(hostName).append(":").append(port);
        return strb.toString();
    }


    private Connection createConnection(String uri) throws JMSException{
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection jmsConnection = null;
        try {
            jmsConnection = factory.createConnection();
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to connect: ");
            throw e;
        }
        return jmsConnection;
    }
    
    private Session createSession(Connection connection) throws JMSException{
        Session session = null;
        try {
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Session: ");
            throw e;
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName) throws JMSException{
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create Destination: ");
            throw e;
        }
        return destination;
    }
    
    public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
        MessageProducer producer = null;
        try {
            producer = session.createProducer(destination);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
            throw e;
        }
        return producer;
    }
    
    public void write(String text, String connectionUrl, String queueName) throws JMSException{
        Connection connection = createConnection(connectionUrl);
        Session session = createSession(connection);
        Destination destination = createDestination(session, queueName);
        MessageProducer producer = createMessageProducer(session, destination);
        Message message = null;
        try {
            message = session.createTextMessage(text);
            producer.send(message);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
            throw e;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
                    throw e;
                }
            }
        }
    }
    
}

Implementation of the Consumer Job

As seen above in the Producer Job example, there are also three more methods needed for the consumer Java job to run.

  • createMessageConsumer(Session session, Destination destination)
  • read(Connection connection, String queueName, Boolean closeMessage)
  • execute()

The createMessageConsumer(Session session, Destination destination) method

This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageConsumer object with the given session and destination.

Code Block
languagejava
titlecreateMessageConsumer(Session session, Destination destination)
linenumberstrue
collapsetrue
private MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
    MessageConsumer consumer = null;
    try {
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to create MessageConsumer: ");
        throw e;
    }
    return consumer;
}

The read(Connection connection, String queueName, Boolean closeMessage) method

The method is called with an already instantiated Connection object the name of the queue to read from and a flag to determine if this is the last consumer. If it is the last consumer for the messages in the specified queue then the reiceived message will be acknowledged. This method makes use of the methods described above. It instantiates all objects needed for the connection and reads a Message object from the given queue. It extracts the value from the Message object as a string representation via the Message object´s getText() method. 

Code Block
languagejava
titleread(Connection connection, String queueName, Boolean closeMessage)
linenumberstrue
collapsetrue
private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
    String messageText = null;
    try {
        Session session = createSession(connection);
        Destination destination = createDestination(session, queueName);
        connection.start();
        MessageConsumer consumer = createMessageConsumer(session, destination);
        Message receivedMessage = null;
        while (true) {
            receivedMessage = consumer.receive(1);
            if (receivedMessage != null) {
                if (receivedMessage instanceof TextMessage) {
                    TextMessage message = (TextMessage) receivedMessage;
                    messageText = message.getText();
                    spooler_log.debug9("Reading message from queue: " + messageText);
                    break;
                } else {
                    break;
                }
            }
        }
        if(closeMessage){
            receivedMessage.acknowledge();
        }
    } catch (JMSException e) {
        spooler_log.error("JMSException occurred while trying to read from Destination: ");
        throw e;
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to close the connection: ");
            }
        }
    }
    return messageText;
}