Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Create a connection.
  2. Create a session.
  3. Create a destination.
  4. Create a consumer.
  5. Receive a message with the consumer.
  6. Close the (MQ) connection.
  7. Open a TCP connection to a JobScheduler instance.
  8. Send the Message to the JobScheduler
  9. Close the (TCP) connection.

...

This part of the document will show examples for the last 5 steps as the first four steps are simmilar as the examples shown above for the instantiation of the MessageProducer.

The Methods

The createMessageConsumer(Session session, Destination destination) method

This method is called with an already active Session object as well as instantiated Destination object. It instantiates a MessageConsumer object with the given session and destination.

Code Block
languagejava
titlecreateMessageConsumer(Session session, Destination destination)
linenumberstrue
collapsetrue
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
    MessageConsumer consumer = null;
    try {
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
    }
    return consumer;
}

The read(MessageConsumer consumer) method

Code Block
languagejava
titleread()
linenumberstrue
collapsetrue
private String read(MessageConsumer consumer) {
    TextMessage message = null;
    String textMessage = null;
    try {
        while (true) {
            Message receivedMessage = consumer.receive(1);
            if (receivedMessage != null) {
                if (receivedMessage instanceof TextMessage) {
                    message = (TextMessage) receivedMessage;
                    textMessage = message.getText();
                    LOGGER.info("Reading message: " + textMessage);
                    break;
                } else {
                    break;
                }
            }
        }
    } catch (JMSException e) {
        LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
    }
    return textMessage;
}
Note

 Don´t forget to clean up (call close()) after the message has been received.

The connect(String host, int port) method

This method instantiates a TCP socket connection to a JobScheduler instance with the given host:port.

Code Block
languagejava
titleconnect()
linenumberstrue
collapsetrue
public void connect(String host, int port) throws Exception {
    if (host == null || host.length() == 0) {
       throw (new Exception("hostname missing."));
    }
    if (port == 0){
       throw (new Exception("port missing."));
    }
    if ("udp".equalsIgnoreCase(PROTOCOL)) {
        udpSocket = new DatagramSocket();
        udpSocket.connect(InetAddress.getByName(HOST), PORT);
    } else {
        socket = new Socket(host, port);
        in = new DataInputStream(socket.getInputStream());
        out = new PrintWriter(socket.getOutputStream(), true);
    }
}

The disconnect() method

This method is used to close the socket opened with the above connect() method.

Code Block
languagejava
titledisconnect()
linenumberstrue
collapsetrue
public void disconnect() throws Exception {
    if (socket != null) {
        socket.close();
    }
    if (in != null) {
        in.close();
    }
    if (out != null) {
        out.close();
    }
}

The sendRequest(String command) method

This method sends a request with the given command to the connected JobScheduler instance. The given command in the example is the add_order XML snippets shown in the SOSProducer example.

Code Block
languagejava
linenumberstrue
collapsetrue
public void sendRequest(String command) throws Exception {
    if ("udp".equalsIgnoreCase(PROTOCOL)) {
        if (command.indexOf("<?xml") == -1) {
            command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
        }
        byte[] commandBytes = command.getBytes();
        udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
    } else {
        if (command.indexOf("<?xml") == 0) {
            out.print(command + "\r\n");
        } else {
            out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
        }
        out.flush();
    }
}

The getResponse() method

THis message receives the response of the tcp connection described above.

Code Block
languagejava
linenumberstrue
collapsetrue
public String getResponse() throws IOException, RuntimeException {
    int sec = 0;
    byte[] buffer = {};
    while (in.available() == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            LOGGER.error("", e);
        }
        if (sec++ == TIMEOUT) {
            throw new RuntimeException("timeout reached");
        }
    }
    buffer = new byte[in.available()];
    int bytesRead = in.read(buffer);
    return new String(buffer, "ISO-8859-1");
}

The receiveFromQueue() method

The receiveFromQueue() method uses the methods described above to connect to a Host running a JobScheduler instance, read from a message queue, send the received message to the JobScheduler instance and close the session to the JobScheduler instance.

Code Block
languagejava
titlereceiveFromQueue()
linenumberstrue
collapsetrue
public String receiveFromQueue() {
    String message = null;
    try {
        connect();
        message = read();
        sendRequest(message);
        disconnect();
    } catch (Exception e) {
        LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
    }
    return message;
}

The SOSConsumer class

The code example below slightly differs from the examples above. In the class below the methode read() already uses the other method for instantiation, therfore it needs less parameters and it closes the connection itself.

Code Block
languagejava
titleSOSConsumer
linenumberstrue
collapsetrue
package com.sos.jms.consumer;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SOSConsumer {
    private static final String DEFAULT_CONNECTION_URI = "tcp://[HOST]:61616";
    private static final Logger LOGGER = Logger.getLogger(SOSConsumer.class);
    private static final String DEFAULT_QUEUE_NAME = "test_queue";
    private static final String HOST = "[JobScheduler_HOSTNAME]";
    private static final int PORT = [JobScheduler_PORT];
    private static final String PROTOCOL = "tcp";
    private static final int TIMEOUT = 5;
    private Socket socket = null;
    private DatagramSocket udpSocket = null;
    private DataInputStream in = null;
    private PrintWriter out = null;

    private Connection createConnection() {
        return this.createConnection(DEFAULT_CONNECTION_URI);
    }

    private Connection createConnection(String uri) {
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection connection = null;
        try {
            connection = factory.createConnection();
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to connect: ", e);
        }
        return connection;
    }

    private Session createSession(Connection connection) {
        Session session = null;
        try {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Session: ", e);
        }
        return session;
    }

    private Destination createDestination(Session session) {
        return this.createDestination(session, DEFAULT_QUEUE_NAME);
    }
    private Destination createDestination(Session session, String queueName) {
        Destination destination = null;
        try {
            destination = session.createQueue(queueName);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create Destination: ", e);
        }
        return destination;
    }

    private MessageConsumer createMessageConsumer(Session session, Destination destination) {
        MessageConsumer consumer = null;
        try {
            consumer = session.createConsumer(destination);
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
        }
        return consumer;
    }

    private String read() {
        TextMessage message = null;
        String textMessage = null;
        Connection connection = createConnection();
        try {
            Session session = createSession(connection);
            Destination destination = createDestination(session);
            connection.start();
            MessageConsumer consumer = createMessageConsumer(session, destination);
            while (true) {
                Message receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        message = (TextMessage) receivedMessage;
                        textMessage = message.getText();
                        LOGGER.info("Reading message: " + textMessage);
                        break;
                    } else {
                        break;
                    }
                }
            }
        } catch (JMSException e) {
            LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    LOGGER.error("JMSException occurred while trying to close the connection: " , e);
                }
            }
         }
        return textMessage;
    }

    public String receiveFromQueue() {
        String message = null;
        try {
            connect();
            message = read();
            sendRequest(message);
            disconnect();
        } catch (Exception e) {
            LOGGER.error("Error occured while publishing to the JobScheduler instance host:" + HOST + ", port:" + PORT, e);
        }
        return message;
    }

    public void connect(String host, int port) throws Exception {
        if (host == null || host.length() == 0) {
            throw (new Exception("hostname missing."));
        }
        if (port == 0){
            throw (new Exception("port missing."));
        }
        if ("udp".equalsIgnoreCase(PROTOCOL)) {
            udpSocket = new DatagramSocket();
            udpSocket.connect(InetAddress.getByName(HOST), PORT);
        } else {
            socket = new Socket(host, port);
            in = new DataInputStream(socket.getInputStream());
            out = new PrintWriter(socket.getOutputStream(), true);
        }
    }

    public void connect() throws Exception {
        this.connect(HOST, PORT);
    }

    /** sends a request to a JobScheduler
     *
     * @param command
     * @throws java.lang.Exception 
     **/
    public void sendRequest(String command) throws Exception {
        if ("udp".equalsIgnoreCase(PROTOCOL)) {
            if (command.indexOf("<?xml") == -1) {
                command = "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n";
            }
            byte[] commandBytes = command.getBytes();
            udpSocket.send(new DatagramPacket(commandBytes, commandBytes.length, InetAddress.getByName(HOST), PORT));
        } else {
            if (command.indexOf("<?xml") == 0) {
                out.print(command + "\r\n");
            } else {
                out.print("<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>" + command + "\r\n");
            }
            out.flush();
        }
    }

    /** receives the response of the JobScheduler for the last sent request
     *
     * @return String response from JobScheduler
     * @throws IOException */
    public String getResponse() throws IOException, RuntimeException {
        int sec = 0;
        byte[] buffer = {};
        while (in.available() == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                LOGGER.error("", e);
            }
            if (sec++ == TIMEOUT) {
                throw new RuntimeException("timeout reached");
            }
        }
        buffer = new byte[in.available()];
        int bytesRead = in.read(buffer);
        return new String(buffer, "ISO-8859-1");
    }

    public void disconnect() throws Exception {
        if (socket != null) {
            socket.close();
        }
        if (in != null) {
            in.close();
        }
        if (out != null) {
            out.close();
        }
    }
}