Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Grammar improved

...

Introduction

This document describes how messages can be sent to and received from a messaging queue and how those messages can be processed in a Java class the implementation required for a Java Message Service queue:

  • to send and receive messages
  • to add an order to a JobScheduler job chain via a message processed in a Java class .

The Apache Active MQ server is used in this example. No JobScheduler libraries are needed for this the example.

Mode Of Operation

The implementation enables the following steps:

  • Creation of Creating a message producer to send messages to a Message Queue server (MQ server).
  • Creating Creation of a message consumer to receive messages from an MQ server.
  • Sending the message to a JobschedulerJobScheduler.

The message has to be an XML snippet which is valid to the JobScheduler XML Schema to be processed on the desired JobScheduler. This snippet must be valid against the JobScheduler XML Schema.

Most of the implementation was done with the standard jms JMS implementation of Java. The only class from the active MQ implementation is the ActiveMQConnectionFactory class. It shouldn´t be too complex , to change the implementation with the ConnectionFactory of your desired message queue service.

...

  • A running Message Queue server (The example makes use of uses Apache Active MQ)
  • A running JobScheduler
  • Maven (only needed if you want to build the example as a maven project)

...

This example describes how to build a connection to an MQ server and send a message to a queue on this server. This example uses Apache Active MQ as the MQ server.

  1. Create a connection.
  2. Create a session.
  3. Create a destination.
  4. Create a producer.
  5. Send a message with the producer.
  6. Close the connection.

The Methods

To The steps are separated into different methods to make the implementation more readable as well as reusable, the steps are separated in different methods

createConnection(String url)

...

This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageProducer object with the given session and destination.

...

The method is called with the message to send to the server and the MessageProducer object to use for publishing. The message is a String object. The method instantiates a Message object with the text to send.

Code Block
languagejava
titlewrite(String text)
linenumberstrue
collapsetrue
public void write(String text, MessageProducer producer){
    Message message = null;
    try {
        if(text != null){
            message = session.createTextMessage(text);
        } else{
            message = session.createTextMessage(DEFAULT_TEXT);
        }
        producer.send(message);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
    }
}

close()

This methods method makes sure that the used connection will be closed after a message is has been sent.

Code Block
languagejava
titleclose()
linenumberstrue
collapsetrue
private void close(Connection connection){
    if (connection != null) {
        try {
            connection.close();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to close the connection: " , e);
        }
    }
}

...

The code example below slightly differs from the examples above. In the class below the methode write(String text) method already uses the other method methods for instantiation, therfore therefore it needs less parameters and it closes the connection itself.

...

Code Block
languagejava
titleSOSProducer.java
linenumberstrue
collapsetrue
package com.sos.jms.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SOSProducer {
    private static final String DEFAULT_CONNECTION_URI = "tcp://[HOSTNAME]:61616";
    private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
    private static final String QUEUE_NAME = "test_queue";
    private static final String DEFAULT_TEXT = "<add_order job_chain='[FOLDERNAME]/[JOBCHAINNAME]' at='now'>"
            + "<params>"
            + "<param name='host' value='[HOST]'/>"
            + "<param name='port' value='22'/>"
            + "<param name='user' value='[USERNAME]'/>"
            + "<param name='auth_method' value='password'/>"
            + "<param name='password' value='[PASSWORD]'/>"
            + "<param name='command' value='echo command send over MessageQueue!'/>"
            + "</params>"
            + "</add_order>";
    
    private Connection createConnection(){
        return this.createConnection(DEFAULT_CONNECTION_URI);
    }
    
    public Connection createConnection(String uri){
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = null;
        try {
            connection = factory.createConnection();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to connect: " , e);
        }
        return connection;
    }
    
    private Session createSession(Connection connection){
        Session session = null;
        try {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Session: " , e);
        }
        return session;
    }
    
    private Destination createDestination(Session session, String queueName){
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Destination: " , e);
        }
        return destination;
    }
    
    private MessageProducer createMessageProducer(Session session, Destination destination){
        MessageProducer producer = null;
        try {
            producer = session.createProducer(destination);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
        }        
        return producer;
    }
    
    public void write(String text){
        Connection connection = createConnection();
        Session session = createSession(connection);
        Destination destination = createDestination(session);
        MessageProducer producer = createMessageProducer(session, destination);
        Message message = null;
        try {
            if(text != null){
                message = session.createTextMessage(text);
            } else{
                message = session.createTextMessage(DEFAULT_TEXT);
            }
            producer.send(message);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.error("JMSException occurred while trying to close the connection: " , e);
                }
            }
        }
    }
}

The MessageConsumer

This example section describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.
This example uses Apache Active MQ as the MQ server.

  1. Create a connection.
  2. Create a session.
  3. Create a destination.
  4. Create a consumer.
  5. Receive a message with the consumer.
  6. Close the (MQ) connection.
  7. Open a TCP connection to a JobScheduler instance.
  8. Send the Message to the JobScheduler
  9. Close the (TCP) connection.

This part of the document will show section shows examples for the last 5 steps as the first four steps are simmilar similar to the examples already shown above for the instantiation of the MessageProducer.

...

This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageConsumer object with the given session and destination.

...

This method sends a request with the given command to the connected JobScheduler instance. The command given command in the example is the add_order XML snippet shown in the SOSProducer example.

Code Block
languagejava
linenumberstrue
collapsetrue
public void sendRequest(String command) throws Exception {
    if ("udp".equalsIgnoreCase(PROTOCOL)) {
        if (command.indexOf("<?xml") == -1) {
            command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
        }
        byte[] commandBytes = command.getBytes();
        udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
    } else {
        if (command.indexOf("<?xml") == 0) {
            out.print(command + "\r\n");
        } else {
            out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
        }
        out.flush();
    }
}

getResponse()

THis This message receives the response of the TCP connection described above.

Code Block
languagejava
linenumberstrue
collapsetrue
public String getResponse() throws IOException, RuntimeException {
    int sec = 0;
    byte[] buffer = {};
    while (in.available() == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            LOGGER.error("", e);
        }
        if (sec++ == TIMEOUT) {
            throw new RuntimeException("timeout reached");
        }
    }
    buffer = new byte[in.available()];
    int bytesRead = in.read(buffer);
    return new String(buffer, "ISO-8859-1");
}

receiveFromQueue()

The receiveFromQueue() method uses the methods described above to connect to a Host host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.

...

The code example below slightly differs from the examples above. In the class below the methode read() method already uses the other method for instantiation, therefore it needs less parameters and it closes the connection itself.

...

Maven Configuration Example

This example shows the dependency needed dependency to build the example above as a maven Maven project.

Code Block
languagexml
titleMaven Configuration
collapsetrue
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>


	<groupId>com.sos-berlin</groupId>
	<artifactId>activeMQ-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>


	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.13.0</version>
		</dependency>
	</dependencies>
</project>

...