Introduction
This document describes how messages can be sent to and received from a messaging queue and how those messages can be processed in a Java class the implementation required for a Java Message Service queue:
- to send and receive messages
- to add an order to a JobScheduler job chain as an example XML command extracted from a message.
A separate implementation is used to send the XML command - .No JobScheduler libraries are not needed for the example.
The Apache Active MQ server is used in this example.
Mode
...
of Operation
The implementation enables the following steps:
- Creation of a message producer Creating a message producer to send messages to a Message Queue server (MQ server) server.
- Creating Creation of a message consumer to receive messages from a an MQ server.
- Sending the message to a JobschedulerJobScheduler.
To be able to process the message on the desired JobScheduler the message The message has to be a an XML snippet wich can to be processed by the JobScheduler API respectively by desired JobScheduler. This snippet must be valid XML and compliant with the JobScheduler XML interfaceSchema.
The Message Producer
Most of the implementation work is done with the standard JMS implementation of Java. The only class from the Active MQ implementation is the ActiveMQConnectionFactory
class. It shouldn´t be too complex to change the implementation to the ConnectionFactory
of your desired message queue service.
Download
A zip file of this example as a complete maven project implementation is available: active-mq-example.zip.
Prerequisites
- A running Message Queue server (The example uses Apache Active MQ)
- A running JobScheduler
- Maven (only needed if you want to build the example as a maven project)
The MessageProducer
This example describes how to build a connection to an MQ server and send a message to a queue on this server.
- Create a connection.
- Create a session.
- Create a destination.
- Create a producer.
- Send a message with the producer.
- Close the connection.
The Methods
The steps are separated into different methods to make the implementation more readable as well as reusable.
createConnection(String url)
This method instantiates a ConnectionFactory
object with an ActiveMQConnectionFactory
object and creates a Connection
object through the factory method createConnection()
.
The ActiveMQConnectionFactory
object has to be instantiated with the URL of the MQ server.
Code Block |
---|
language | java |
---|
title | createConnection(String url) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public Connection createConnection(String uri){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
try |
Code Block |
---|
language | java |
---|
title | SOSProducer.java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSProducer {
private static final String DEFAULT_CONNECTION_URIconnection = "tcp://[HOSTNAME]:61616"factory.createConnection();
private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
} catch (JMSException e) {
private static final String QUEUE_NAME = "test_queue"LOGGER.error("JMSException occurred while trying to connect: " , e);
private}
static final String TEXT = "<add_order job_chain='[FOLDERNAME]/[JOBCHAINNAME]' at='now'>"
+ "<params>"
+ "<param name='host' value='[HOST]'/>"return connection;
} |
createSession(Connection connection)
The method is called with an already instantiated Connection
object and initiates a Session
object through the Connection
objects createSession(boolean transacted, int acknowledgeMode
)
method.
Code Block |
---|
language | java |
---|
title | createSession(Connection connection) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private Session createSession(Connection connection){
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: " , e);
}
return session;
} |
createDestination(Session session, String queueName
)
This method creates a Destination
object. It is called with an active Session
object and the name of the queue to write to. The Destination
object is initiated through the createQueue(String name)
method of the Session
object.
Code Block |
---|
language | java |
---|
title | createDestination(Session session) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public Destination createDestination(Session session, String queueName){
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Destination: " , e);
}
return destination;
} |
createMessageProducer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageProducer
object with the given session and destination.
Code Block |
---|
language | java |
---|
title | createProducer(Session session, Destination destination) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private MessageProducer createMessageProducer(Session session, Destination destination){
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
}
return producer;
} |
write(String text, MessageProducer producer)
The method is called with the message to send to the server and the MessageProducer
object to use for publishing. The message is a String
object. The method instantiates a Message
object with the text to send.
Code Block |
---|
language | java |
---|
title | write(String text) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void write(String text, MessageProducer producer){
Message message = null;
try {
if(text != null){
message = session.createTextMessage(text);
} else{
message = session.createTextMessage(DEFAULT_TEXT);
}
producer.send(message);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
}
} |
close()
This method makes sure that the connection used will be closed after a message has been sent.
Code Block |
---|
language | java |
---|
title | close() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private void close(Connection connection){
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
} |
The SOSProducer
class
The code example below slightly differs from the examples above. In the class below the write(String text)
method already uses the other methods for instantiation, therefore it needs less parameters and it closes the connection itself.
The text in the example below consists of a JobScheduler add_order
XML, which can later be used to send to a JobScheduler.
Code Block |
---|
language | java |
---|
title | SOSProducer.java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSProducer {
private static final String DEFAULT_CONNECTION_URI = "tcp://[HOSTNAME]:61616";
private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
private static final String QUEUE_NAME = "test_queue";
private static final String DEFAULT_TEXT = "<add_order job_chain='[FOLDERNAME]/[JOBCHAINNAME]' at='now'>"
+ "<params>"
+ "<param name='host' value='[HOST]'/>"
+ "<param name='port' value='22'/>"
+ "<param name='user' value='[USERNAME]'/>"
+ "<param name='auth_method' value='password'/>"
+ "<param name='password' value='[PASSWORD]'/>"
+ "<param name='command' value='echo command send over MessageQueue!'/>"
+ "</params>"
+ "</add_order>";
private Connection createConnection(){
return this.createConnection(DEFAULT_CONNECTION_URI);
}
public Connection createConnection(String uri){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
try {
connection = factory.createConnection();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to connect: " , e);
}
return connection;
}
private Session createSession(Connection connection){
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: " , e);
}
return session;
}
private Destination createDestination(Session session, String queueName){
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Destination: " , e);
}
return destination;
}
private MessageProducer createMessageProducer(Session session, Destination destination){
MessageProducer producer = null;
try {
producer = session.createProducer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e);
}
return producer;
}
public void write(String text){
Connection connection = createConnection();
Session session = createSession(connection);
Destination destination = createDestination(session);
MessageProducer producer = createMessageProducer(session, destination);
Message message = null;
try {
if(text != null){
message = session.createTextMessage(text);
} else{
message = session.createTextMessage(DEFAULT_TEXT);
}
producer.send(message);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
}
} |
The MessageConsumer
This section describes how to build a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to build a TCP socket connection to send the received message to a JobScheduler.
- Create a connection.
- Create a session.
- Create a destination.
- Create a consumer.
- Receive a message with the consumer.
- Close the (MQ) connection.
- Open a TCP connection to a JobScheduler instance.
- Send the Message to the JobScheduler
- Close the (TCP) connection.
This section shows examples for the last 5 steps as the first four are similar to the examples already shown above for the instantiation of the MessageProducer
.
The Methods
createMessageConsumer(Session session, Destination destination)
This method is called with an already active Session
object as well as an instantiated Destination
object. It instantiates a MessageConsumer
object with the given session and destination.
Code Block |
---|
language | java |
---|
title | createMessageConsumer(Session session, Destination destination) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
}
return consumer;
} |
read(MessageConsumer consumer
)
The method is called with an already instantiated MessageConsumer
object to receive a message from the MQ server. It extracts the value from the Message
object as a string representation via the Message
objects getText()
method.
Code Block |
---|
language | java |
---|
title | read() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private String read(MessageConsumer consumer) {
TextMessage message = null;
String textMessage = null;
try {
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
message = (TextMessage) receivedMessage;
textMessage = message.getText();
LOGGER.info("Reading message: " + textMessage);
break;
} else {
break;
}
}
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
}
return textMessage;
} |
Note |
---|
Don´t forget to clean up (call close() ) after the message has been received. |
connect(String host, int port)
This method instantiates a TCP socket connection to a JobScheduler instance with the given host:port
.
Code Block |
---|
language | java |
---|
title | connect() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void connect(String host, int port) throws Exception {
if (host == null || host.length() == 0) {
throw (new Exception("hostname missing."));
}
if (port == 0){
throw (new Exception("port missing."));
}
if ("udp".equalsIgnoreCase(PROTOCOL)) {
udpSocket = new DatagramSocket();
udpSocket.connect(InetAddress.getByName(HOST), PORT);
} else {
socket = new Socket(host, port);
in = new DataInputStream(socket.getInputStream());
out = new PrintWriter(socket.getOutputStream(), true);
}
} |
disconnect()
This method is used to close the socket opened with the above connect()
method.
Code Block |
---|
language | java |
---|
title | disconnect() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void disconnect() throws Exception {
if (socket != null) {
socket.close();
}
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
} |
sendRequest(String command)
This method sends a request with the given command
to the connected JobScheduler instance. The command given in the example is the add_order
XML snippet shown in the SOSProducer
example.
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void sendRequest(String command) throws Exception {
if ("udp".equalsIgnoreCase(PROTOCOL)) {
if (command.indexOf("<?xml") == -1) {
command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
}
byte[] commandBytes = command.getBytes();
udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
} else {
if (command.indexOf("<?xml") == 0) {
out.print(command + "\r\n");
} else {
out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
}
out.flush();
}
} |
getResponse()
This message receives the response of the TCP connection described above.
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public String getResponse() throws IOException, RuntimeException {
int sec = 0;
byte[] buffer = {};
while (in.available() == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("", e);
}
if (sec++ == TIMEOUT) {
throw new RuntimeException("timeout reached");
}
}
buffer = new byte[in.available()];
int bytesRead = in.read(buffer);
return new String(buffer, "ISO-8859-1");
} |
receiveFromQueue()
The receiveFromQueue()
method uses the methods described above to connect to a host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.
Code Block |
---|
language | java |
---|
title | receiveFromQueue() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public String receiveFromQueue() {
String message = null;
try {
connect();
message = read();
sendRequest(message);
disconnect();
} catch (Exception e) {
LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
}
return message;
} |
The SOSConsumer
class
The code example below slightly differs from the examples above. In the class below the read()
method already uses the other method for instantiation, therefore it needs less parameters and it closes the connection itself.
Code Block |
---|
language | java |
---|
title | SOSConsumer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.consumer;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSConsumer {
private static final String DEFAULT_CONNECTION_URI = "tcp://[HOST]:61616";
private static final Logger LOGGER = Logger.getLogger(SOSConsumer.class);
private static final String DEFAULT_QUEUE_NAME = "test_queue";
private static final String HOST = "[JobScheduler_HOSTNAME]";
private static final int PORT = [JobScheduler_PORT];
private static final String PROTOCOL = "tcp";
private static final int TIMEOUT = 5;
private Socket socket = null;
private DatagramSocket udpSocket = null;
private DataInputStream in = null;
private PrintWriter out = null;
private Connection createConnection() {
return this.createConnection(DEFAULT_CONNECTION_URI);
}
private Connection createConnection(String uri) {
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
try {
connection = factory.createConnection();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to connect: ", e);
}
return connection;
}
private Session createSession(Connection connection) {
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Session: ", e);
}
return session;
}
private Destination createDestination(Session session) {
return this.createDestination(session, DEFAULT_QUEUE_NAME);
}
private Destination createDestination(Session session, String queueName) {
Destination destination = null;
try {
destination = session.createQueue(queueName);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create Destination: ", e);
}
return destination;
}
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
}
return consumer;
}
private String read() {
TextMessage message = null;
String textMessage = null;
Connection connection = createConnection();
try {
Session session = createSession(connection);
Destination destination = createDestination(session);
connection.start();
MessageConsumer consumer = createMessageConsumer(session, destination);
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
message = (TextMessage) receivedMessage;
textMessage = message.getText();
LOGGER.info("Reading message: " + textMessage);
break;
} else {
break;
}
}
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
return textMessage;
}
public String receiveFromQueue() {
String message = null;
try {
connect();
message = read();
sendRequest(message);
disconnect();
} catch (Exception e) {
LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
}
return message;
}
public void connect(String host, int port) throws Exception {
if (host == null + "<param name='port' value='22'/>"|| host.length() == 0) {
+ "<param name='user' value='[USERNAME]'/>"
throw (new Exception("hostname missing."));
}
+if "<param(port name=='auth_method' value='password'/>" 0){
throw (new Exception("port missing."));
+ "<param name='password' value='[PASSWORD]'/>" }
if ("udp".equalsIgnoreCase(PROTOCOL)) {
+ "<param name='command' value='echo command send over MessageQueue!'/>"
udpSocket = new DatagramSocket();
+ "</params>"
udpSocket.connect(InetAddress.getByName(HOST), PORT);
+ "</add_order>";
} else {
private Connection createConnection(){
socket = returnnew this.createConnection(DEFAULT_CONNECTION_URISocket(host, port);
}
in private= Connection createConnection(String uri){
new DataInputStream(socket.getInputStream());
ConnectionFactory factoryout = new ActiveMQConnectionFactory(uriPrintWriter(socket.getOutputStream(), true);
}
Connection connection = null;}
public void connect() throws tryException {
connection = factory.createConnection(this.connect(HOST, PORT);
}
/** sends }a catchrequest (JMSExceptionto e)a {JobScheduler
*
* LOGGER.error("JMSException occurred while trying to connect: " , e);@param command
* @throws java.lang.Exception
**/
}
public void sendRequest(String command) throws Exception {
return connection;
}
if ("udp".equalsIgnoreCase(PROTOCOL)) {
private Session createSession(Connection connection){
Session session = null;if (command.indexOf("<?xml") == -1) {
try {
command = "<?xml version=\"1.0\" session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)encoding=\"iso-8859-1\"?>" + command + "\r\n";
} catch (JMSException e) {}
LOGGER.error("JMSException occurred while trying to create Session: " , ebyte[] commandBytes = command.getBytes();
udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
}
else {
if (command.indexOf("<?xml") == return0) session;{
}
private Destination createDestination(Session session){out.print(command + "\r\n");
Destination destination} =else null;{
try {
out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command destination = session.createQueue(QUEUE_NAME+ "\r\n");
} catch (JMSException e) {}
LOGGERout.error("JMSException occurred while trying to create Destination: " , e);
}flush();
return destination;}
}
private/** MessageProducerreceives createMessageProducer(Session session, Destination destination){
MessageProducer producer = null;
the response of the JobScheduler for the last sent request
*
* @return String response tryfrom {JobScheduler
* @throws IOException */
producerpublic =String session.createProducergetResponse(destination);
} catch (JMSException e) throws IOException, RuntimeException {
int sec = LOGGER.error("JMSException occurred while trying to create MessageProducer: " , e)0;
byte[] buffer = {};
} while (in.available() == 0) {
returntry producer;{
}
public void write(String text){
Thread.sleep(1000);
Connection connection} =catch createConnection(InterruptedException e); {
Session session = createSession(connectionLOGGER.error("", e);
Destination destination = createDestination(session);}
MessageProducer producer =if createMessageProducer(session, destination);(sec++ == TIMEOUT) {
Message message = null;
throw try {new RuntimeException("timeout reached");
if(text != null){}
}
messagebuffer = new sessionbyte[in.createTextMessageavailable(text)];
int bytesRead = in.read(buffer);
} else{
return new String(buffer, "ISO-8859-1");
}
messagepublic =void session.createTextMessagedisconnect(TEXT);) throws Exception {
if (socket != null) }{
producersocket.sendclose(message);
} catch (JMSException e
if (in != null) {
LOGGERin.error("JMSException occurred while trying to write Message to Destination: " , e);
close();
}
if (out } finally!= null) {
if (connection != null) {out.close();
}
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
}
}}
} |
Maven Configuration Example
This example shows the dependency needed to build the example above as a Maven project.
Code Block |
---|
language | xml |
---|
title | Maven Configuration |
---|
collapse | true |
---|
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sos-berlin</groupId>
<artifactId>activeMQ-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
</project> |