Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Most of the implementation was done with the standard jms implementation of Java. The only class from the active MQ implementation is the ActiveMQConnectionFactory class. It shouldn´t be too complex, to change the implementation with the ConnectionFactory of your desired message queue service.

Prerequisites

  • A running Message Queue server
  • A running JobScheduler
  • Maven (only needed if you want to build the example as a maven project)

The MessageProducer

This example describes how to build a connection to an MQ server and send a message to a queue on this server. This example uses Apache Active MQ as the MQ server.

...

Code Block
languagejava
titleSOSConsumer
linenumberstrue
collapsetrue
package com.sos.jms.consumer;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SOSConsumer {
    private static final String DEFAULT_CONNECTION_URI = "tcp://[HOST]:61616";
    private static final Logger LOGGER = Logger.getLogger(SOSConsumer.class);
    private static final String DEFAULT_QUEUE_NAME = "test_queue";
    private static final String HOST = "[JobScheduler_HOSTNAME]";
    private static final int PORT = [JobScheduler_PORT];
    private static final String PROTOCOL = "tcp";
    private static final int TIMEOUT = 5;
    private Socket socket = null;
    private DatagramSocket udpSocket = null;
    private DataInputStream in = null;
    private PrintWriter out = null;

    private Connection createConnection() {
        return this.createConnection(DEFAULT_CONNECTION_URI);
    }

    private Connection createConnection(String uri) {
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = null;
        try {
            connection = factory.createConnection();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to connect: ", e);
        }
        return connection;
    }

    private Session createSession(Connection connection) {
        Session session = null;
        try {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Session: ", e);
        }
        return session;
    }

    private Destination createDestination(Session session) {
        return this.createDestination(session, DEFAULT_QUEUE_NAME);
    }
    private Destination createDestination(Session session, String queueName) {
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Destination: ", e);
        }
        return destination;
    }

    private MessageConsumer createMessageConsumer(Session session, Destination destination) {
        MessageConsumer consumer = null;
        try {
            consumer = session.createConsumer(destination);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
        }
        return consumer;
    }

    private String read() {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = createConnection();
        try {
            Session session = createSession(connection);
            Destination destination = createDestination(session);
            connection.start();
            MessageConsumer consumer = createMessageConsumer(session, destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.error("JMSException occurred while trying to close the connection: " , e);
                }
            }
         }
        return textMessage;
    }

    public String receiveFromQueue() {
        String message = null;
        try {
            connect();
            message = read();
            sendRequest(message);
            disconnect();
        } catch (Exception e) {
            LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
        }
        return message;
    }

    public void connect(String host, int port) throws Exception {
        if (host == null || host.length() == 0) {
            throw (new Exception("hostname missing."));
        }
        if (port == 0){
            throw (new Exception("port missing."));
        }
        if ("udp".equalsIgnoreCase(PROTOCOL)) {
            udpSocket = new DatagramSocket();
            udpSocket.connect(InetAddress.getByName(HOST), PORT);
        } else {
            socket = new Socket(host, port);
            in = new DataInputStream(socket.getInputStream());
            out = new PrintWriter(socket.getOutputStream(), true);
        }
    }

    public void connect() throws Exception {
        this.connect(HOST, PORT);
    }

    /** sends a request to a JobScheduler
     *
     * @param command
     * @throws java.lang.Exception 
     **/
    public void sendRequest(String command) throws Exception {
        if ("udp".equalsIgnoreCase(PROTOCOL)) {
            if (command.indexOf("<?xml") == -1) {
                command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
            }
            byte[] commandBytes = command.getBytes();
            udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
        } else {
            if (command.indexOf("<?xml") == 0) {
                out.print(command + "\r\n");
            } else {
                out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
            }
            out.flush();
        }
    }

    /** receives the response of the JobScheduler for the last sent request
     *
     * @return String response from JobScheduler
     * @throws IOException */
    public String getResponse() throws IOException, RuntimeException {
        int sec = 0;
        byte[] buffer = {};
        while (in.available() == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                LOGGER.error("", e);
            }
            if (sec++ == TIMEOUT) {
                throw new RuntimeException("timeout reached");
            }
        }
        buffer = new byte[in.available()];
        int bytesRead = in.read(buffer);
        return new String(buffer, "ISO-8859-1");
    }

    public void disconnect() throws Exception {
        if (socket != null) {
            socket.close();
        }
        if (in != null) {
            in.close();
        }
        if (out != null) {
            out.close();
        }
    }
}

Maven Configuration Example

This example shows the needed dependency to build the example above as a maven project.

Code Block
languagexml
titleMaven Configuration
collapsetrue
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>


	<groupId>com.sos-berlin</groupId>
	<artifactId>activeMQ-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>


	<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.13.0</version>
		</dependency>
	</dependencies>
</project>