...
Introduction
This document describes how messages can be sent to and received from a messaging queue and how those messages can be processed in a Java class the implementation required for a Java Message Service queue:
- to send and receive messages
- to add an order to a JobScheduler job chain via a message processed in a Java class .
The Apache Active MQ server is used in this example. No JobScheduler libraries are needed for this the example.
Mode Of Operation
The implementation enables the following steps:
- Creation of Creating a message producer to send messages to a Message Queue server (MQ server).
- Creating Creation of a message consumer to receive messages from an MQ server.
- Sending the message to a JobschedulerJobScheduler.
The message has to be an XML snippet which is valid to the JobScheduler XML Schema to be processed on the desired JobScheduler. This snippet must be valid against the JobScheduler XML Schema.
Most of the implementation was done with the standard jms JMS implementation of Java. The only class from the active MQ implementation is the ActiveMQConnectionFactory
class. It shouldn´t be too complex , to change the implementation with the ConnectionFactory
of your desired message queue service.
...
- A running Message Queue server (The example makes use of uses Apache Active MQ)
- A running JobScheduler
- Maven (only needed if you want to build the example as a maven project)
...
This example describes how to build a connection to an MQ server and send a message to a queue on this server. This example uses Apache Active MQ as the MQ server.
- Create a connection.
- Create a session.
- Create a destination.
- Create a producer.
- Send a message with the producer.
- Close the connection.
The Methods
To The steps are separated into different methods to make the implementation more readable as well as reusable, the steps are separated in different methods.
createConnection(String url)
...
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageProducer
object with the given session and destination.
...
The method is called with the message to send to the server and the MessageProducer
object to use for publishing. The message is a String
object. The method instantiates a Message
object with the text to send.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public void write(String text, MessageProducer producer){ Message message = null; try { if(text != null){ message = session.createTextMessage(text); } else{ message = session.createTextMessage(DEFAULT_TEXT); } producer.send(message); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e); } } |
close()
This methods method makes sure that the used connection will be closed after a message is has been sent.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
private void close(Connection connection){ if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to close the connection: " , e); } } } |
...
The code example below slightly differs from the examples above. In the class below the methode write(String text)
method already uses the other method methods for instantiation, therfore therefore it needs less parameters and it closes the connection itself.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package com.sos.jms.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class SOSProducer { private static final String DEFAULT_CONNECTION_URI = "tcp://[HOSTNAME]:61616"; private static final Logger LOGGER = Logger.getLogger(SOSProducer.class); private static final String QUEUE_NAME = "test_queue"; private static final String DEFAULT_TEXT = "<add_order job_chain='[FOLDERNAME]/[JOBCHAINNAME]' at='now'>" + "<params>" + "<param name='host' value='[HOST]'/>" + "<param name='port' value='22'/>" + "<param name='user' value='[USERNAME]'/>" + "<param name='auth_method' value='password'/>" + "<param name='password' value='[PASSWORD]'/>" + "<param name='command' value='echo command send over MessageQueue!'/>" + "</params>" + "</add_order>"; private Connection createConnection(){ return this.createConnection(DEFAULT_CONNECTION_URI); } public Connection createConnection(String uri){ ConnectionFactory factory = new ActiveMQConnectionFactory(uri); Connection connection = null; try { connection = factory.createConnection(); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to connect: " , e); } return connection; } private Session createSession(Connection connection){ Session session = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to create Session: " , e); } return session; } private Destination createDestination(Session session, String queueName){ Destination destination = null; try { destination = session.createQueue(queueName); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to create Destination: " , e); } return destination; } private MessageProducer createMessageProducer(Session session, Destination destination){ MessageProducer producer = null; try { producer = session.createProducer(destination); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e); } return producer; } public void write(String text){ Connection connection = createConnection(); Session session = createSession(connection); Destination destination = createDestination(session); MessageProducer producer = createMessageProducer(session, destination); Message message = null; try { if(text != null){ message = session.createTextMessage(text); } else{ message = session.createTextMessage(DEFAULT_TEXT); } producer.send(message); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to close the connection: " , e); } } } } } |
The MessageConsumer
This example section describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.
This example uses Apache Active MQ as the MQ server.
- Create a connection.
- Create a session.
- Create a destination.
- Create a consumer.
- Receive a message with the consumer.
- Close the (MQ) connection.
- Open a TCP connection to a JobScheduler instance.
- Send the Message to the JobScheduler
- Close the (TCP) connection.
This part of the document will show section shows examples for the last 5 steps as the first four steps are simmilar similar to the examples already shown above for the instantiation of the MessageProducer
.
...
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageConsumer
object with the given session and destination.
...
This method sends a request with the given command
to the connected JobScheduler instance. The command given command in the example is the add_order
XML snippet shown in the SOSProducer
example.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public void sendRequest(String command) throws Exception { if ("udp".equalsIgnoreCase(PROTOCOL)) { if (command.indexOf("<?xml") == -1) { command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n"; } byte[] commandBytes = command.getBytes(); udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT)); } else { if (command.indexOf("<?xml") == 0) { out.print(command + "\r\n"); } else { out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n"); } out.flush(); } } |
getResponse()
THis This message receives the response of the TCP connection described above.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public String getResponse() throws IOException, RuntimeException { int sec = 0; byte[] buffer = {}; while (in.available() == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { LOGGER.error("", e); } if (sec++ == TIMEOUT) { throw new RuntimeException("timeout reached"); } } buffer = new byte[in.available()]; int bytesRead = in.read(buffer); return new String(buffer, "ISO-8859-1"); } |
receiveFromQueue()
The receiveFromQueue()
method uses the methods described above to connect to a Host host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.
...
The code example below slightly differs from the examples above. In the class below the methode read()
method already uses the other method for instantiation, therefore it needs less parameters and it closes the connection itself.
...
Maven Configuration Example
This example shows the dependency needed dependency to build the example above as a maven Maven project.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sos-berlin</groupId> <artifactId>activeMQ-example</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.0</version> </dependency> </dependencies> </project> |
...