...
Most of the implementation was done with the standard jms implementation of Java. The only class from the active MQ implementation is the ActiveMQConnectionFactory
class. It shouldn´t be too complex, to change the implementation with the ConnectionFactory
of your desired message queue service.
Prerequisites
- A running Message Queue server
- A running JobScheduler
- Maven (only needed if you want to build the example as a maven project)
The MessageProducer
This example describes how to build a connection to an MQ server and send a message to a queue on this server. This example uses Apache Active MQ as the MQ server.
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package com.sos.jms.consumer; import java.io.DataInputStream; import java.io.IOException; import java.io.PrintWriter; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.Socket; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class SOSConsumer { private static final String DEFAULT_CONNECTION_URI = "tcp://[HOST]:61616"; private static final Logger LOGGER = Logger.getLogger(SOSConsumer.class); private static final String DEFAULT_QUEUE_NAME = "test_queue"; private static final String HOST = "[JobScheduler_HOSTNAME]"; private static final int PORT = [JobScheduler_PORT]; private static final String PROTOCOL = "tcp"; private static final int TIMEOUT = 5; private Socket socket = null; private DatagramSocket udpSocket = null; private DataInputStream in = null; private PrintWriter out = null; private Connection createConnection() { return this.createConnection(DEFAULT_CONNECTION_URI); } private Connection createConnection(String uri) { ConnectionFactory factory = new ActiveMQConnectionFactory(uri); Connection connection = null; try { connection = factory.createConnection(); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to connect: ", e); } return connection; } private Session createSession(Connection connection) { Session session = null; try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to create Session: ", e); } return session; } private Destination createDestination(Session session) { return this.createDestination(session, DEFAULT_QUEUE_NAME); } private Destination createDestination(Session session, String queueName) { Destination destination = null; try { destination = session.createQueue(queueName); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to create Destination: ", e); } return destination; } private MessageConsumer createMessageConsumer(Session session, Destination destination) { MessageConsumer consumer = null; try { consumer = session.createConsumer(destination); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e); } return consumer; } private String read() { TextMessage message = null; String textMessage = null; Connection connection = createConnection(); try { Session session = createSession(connection); Destination destination = createDestination(session); connection.start(); MessageConsumer consumer = createMessageConsumer(session, destination); while (true) { Message receivedMessage = consumer.receive(1); if (receivedMessage != null) { if (receivedMessage instanceof TextMessage) { message = (TextMessage) receivedMessage; textMessage = message.getText(); LOGGER.info("Reading message: " + textMessage); break; } else { break; } } } } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to read from Destination: ", e); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.error("JMSException occurred while trying to close the connection: " , e); } } } return textMessage; } public String receiveFromQueue() { String message = null; try { connect(); message = read(); sendRequest(message); disconnect(); } catch (Exception e) { LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e); } return message; } public void connect(String host, int port) throws Exception { if (host == null || host.length() == 0) { throw (new Exception("hostname missing.")); } if (port == 0){ throw (new Exception("port missing.")); } if ("udp".equalsIgnoreCase(PROTOCOL)) { udpSocket = new DatagramSocket(); udpSocket.connect(InetAddress.getByName(HOST), PORT); } else { socket = new Socket(host, port); in = new DataInputStream(socket.getInputStream()); out = new PrintWriter(socket.getOutputStream(), true); } } public void connect() throws Exception { this.connect(HOST, PORT); } /** sends a request to a JobScheduler * * @param command * @throws java.lang.Exception **/ public void sendRequest(String command) throws Exception { if ("udp".equalsIgnoreCase(PROTOCOL)) { if (command.indexOf("<?xml") == -1) { command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n"; } byte[] commandBytes = command.getBytes(); udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT)); } else { if (command.indexOf("<?xml") == 0) { out.print(command + "\r\n"); } else { out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n"); } out.flush(); } } /** receives the response of the JobScheduler for the last sent request * * @return String response from JobScheduler * @throws IOException */ public String getResponse() throws IOException, RuntimeException { int sec = 0; byte[] buffer = {}; while (in.available() == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { LOGGER.error("", e); } if (sec++ == TIMEOUT) { throw new RuntimeException("timeout reached"); } } buffer = new byte[in.available()]; int bytesRead = in.read(buffer); return new String(buffer, "ISO-8859-1"); } public void disconnect() throws Exception { if (socket != null) { socket.close(); } if (in != null) { in.close(); } if (out != null) { out.close(); } } } |
Maven Configuration Example
This example shows the needed dependency to build the example above as a maven project.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sos-berlin</groupId>
<artifactId>activeMQ-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.0</version>
</dependency>
</dependencies>
</project> |