...
- create a connection,
- create a session,
- create a destination,
- create a producer,
- send a message with the producer,
- close the connection.
Methods
The steps are divided into different methods to make the implementation more readable and reusable.
createConnection(String url)
This method instantiates a ConnectionFactory
object with an ActiveMQConnectionFactory
object and creates a Connection
object through the factory method createConnection()
.
write(String text)
The method is called with the message to send to the server. The message is a String
object. The method instantiates a Message
object with the text to sendThe ActiveMQConnectionFactory
object has to be instantiated with the URL of the Message Queue Service.
Code Block |
---|
language | java |
---|
title | createConnectionwrite(String urltext) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public Connectionvoid createConnectionwrite(String uritext){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = null;
ConnectionSession connectionsession = null;
try {
ConnectionFactory connectionfactory = new factory.createConnectionActiveMQConnectionFactory(uri);
} connection catch= factory.createConnection(JMSException e) {;
LOGGER.error("JMSException occurred while trying tosession connect: " , e= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
return connection;
} |
createSession(Connection connection)
...
Code Block |
---|
language | java |
---|
title | createSession(Connection connection) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private Session createSession(Connection connection){ Destination destination = session.createQueue(QUEUE_NAME);
Session session = null;
try {
MessageProducer producer = session.createProducer(destination);
Message sessionmessage = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)null;
} catch if(JMSException e) text != null){
LOGGER.error("JMSException occurred while trying to create Session: " , e);
message = session.createTextMessage(text);
} else{
return session;
} |
createDestination(Session session, String queueName
)
...
Code Block |
---|
language | java |
---|
title | createDestination(Session session) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public Destination createDestination(Session session, String queueName){
Destination destination = null; message = session.createTextMessage(TEXT);
}
try {
destination = session.createQueue(queueNameproducer.send(message);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to write Message createto Destination: " , e);
} finally {
return destination;
} |
createMessageProducer(Session session, Destination destination)
...
Code Block |
---|
language | java |
---|
title | createProducer(Session session, Destination destination) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private MessageProducer createMessageProducer(Session session, Destination destination){
MessageProducer producer = null;
try {
if(session != null) {
try {
producer = session.createProducerclose(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageProducer LOGGER.error("JMSException occurred while trying to close the session: " , e);
}
}
return producer;
} |
write(String text, MessageProducer producer)
...
Code Block |
---|
language | java |
---|
title | write(String text) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public void write(String text, MessageProducer producer) if (connection != null) {
try {
Message message = null;
try {
if(text != null){
connection.close();
message = session.createTextMessage(text); } catch (JMSException e) {
} else{
message = session.createTextMessage(DEFAULT_TEXT);
}LOGGER.error("JMSException occurred while trying to close the connection: " , e);
producer.send(message);
} catch (JMSException e) {}
LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e); }
}
}
} |
close()
The SOSProducer
class
The below code example shows the complete class. The code in the below example includes a default TEXT consiting of a body for JS7 /orders/add
requestThis method makes sure that the connection will be closed after a message has been sent.
Code Block |
---|
language | java |
---|
title | close()SOSProducer.java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private void close(Connection connection){
if (connection != null) {
try {
connection.close(package com.sos.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSProducer {
private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
private static final String }QUEUE_NAME catch (JMSException e) {= "test_queue";
private static final String TEXT = LOGGER.error("JMSException occurred while trying to close the connection: " , e);
"{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
private String uri;
public SOSProducer(String uri) {
this.uri = uri;
}
}
} |
The SOSProducer
class
...
...
Code Block |
---|
language | java |
---|
title | SOSProducer.java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
public class SOSProducer {
private static final Logger LOGGER = Logger.getLogger(SOSProducer.class);
private static final String QUEUE_NAME = "test_queue";
private static final String TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}";
private String uri;
public SOSProducer(String uri) {
this.uri = uri;
}
private Connection createConnection(){
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
text){
Connection connection = null;
Session session = null;
try {
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
Message message = null;
Connection connectionif(text != null;){
try {
message connection = factorysession.createConnectioncreateTextMessage(text);
} catch (JMSException e) } else{
LOGGER.error("JMSException occurred while trying tomessage connect: " , e= session.createTextMessage(TEXT);
}
return connection;
}producer.send(message);
private} Sessioncatch createSession(ConnectionJMSException connectione) {
Session session = null;
try { LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e);
} finally {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if(session } catch (JMSException e!= null) {
LOGGER.error("JMSException occurred while trying totry create{
Session: " , e);
}
return session.close();
}
private} Destinationcatch createDestination(SessionJMSException sessione) {
Destination destination = null;
try {
destination = session.createQueue(QUEUE_NAME);
LOGGER.error("JMSException occurred while trying to close the session: " , e);
} catch (JMSException e) {}
LOGGER.error("JMSException occurred while trying to create Destination: " , e);
}
if (connection != null) }{
return destination;
}
try {
private MessageProducer createMessageProducer(Session session, Destination destination){
MessageProducer producer = nullconnection.close();
try {
} catch (JMSException producer = session.createProducer(destination);
e) {
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close createthe MessageProducerconnection: " , e);
} }
return producer;
}
public void write(String text){ }
Connection connection = createConnection();
}
Session session = createSession(connection);
Destination destination = createDestination(session);
MessageProducer producer = createMessageProducer(session, destination);
Message message = null;
try
} |
The MessageConsumer
This section describes how to establish a connection to a MQ server and receive a message from a queue on this server. Furthermore it shows how to connect to a JOC Cockpit instance via HTTP to send an api request:
- create a connection,
- create a session,
- create a destination.
- create a consumer,
- receive a message with the consumer,
- close the (MQ) connection,
- login to a JOC Cockpit instance via a HTTP REST API call,
- send a ./orders/add API request to a JOC Cockpit instance,
- close the connection.
Methods
read(MessageConsumer consumer
)
The method instantiates a MessageConsumer
object to receive a message from the Message Queue Service. It extracts the value from the Message
object as a string representation via the Message
objects getText()
method.
Code Block |
---|
language | java |
---|
title | read() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public String read() {
TextMessage message = if(text != null){
null;
String textMessage = null;
Connection messageconnection = session.createTextMessage(text)null;
Session session = null;
} else{
try {
message = session.createTextMessage(TEXT ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection }
= factory.createConnection();
session = producerconnection.send(messagecreateSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
Destination destination LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e= session.createQueue(DEFAULT_QUEUE_NAME);
connection.start();
} finally {
MessageConsumer consumer if (connection != null) {= session.createConsumer(destination);
trywhile (true) {
Message receivedMessage = connectionconsumer.closereceive(1);
}if catch (JMSException e(receivedMessage != null) {
if LOGGER.error("JMSExceptionreceivedMessage occurredinstanceof whileTextMessage) trying{
to close the connection: " , e);
}
message = (TextMessage) receivedMessage;
}
}
}
} |
The MessageConsumer
This section describes how to establish a connection to a MQ server and receive a message from a queue on this server. Furthermore it describes how to establish a TCP socket connection to send the received message to a JOC Cockpit instance:
- create a connection,
- create a session,
- create a destination.
- create a consumer,
- receive a message with the consumer,
- close the (MQ) connection,
- open a TCP connection to a JOC Cockpit instance,
- send the Message to a JOC Cockpit instance,
- close the TCP connection.
This section shows examples for the last five steps as the first four are similar to the examples explained above for the instantiation of the MessageProducer
.
Methods
createMessageConsumer(Session session, Destination destination)
...
Code Block |
---|
language | java |
---|
title | createMessageConsumer(Session session, Destination destination) |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private MessageConsumer createMessageConsumer(Session session, Destination destination) {
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to create MessageConsumer: ", e);
}
return consumer;
} |
read(MessageConsumer consumer
)
...
Code Block |
---|
language | java |
---|
title | read() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
private String read(MessageConsumer consumer) {
TextMessage message = null;
String textMessage = null;
try {
while (true) {
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {
if (receivedMessage instanceof TextMessage) {
message = (TextMessage) receivedMessage;
textMessage = message.getText();
LOGGER.info("Reading message: " + textMessage);
break;
} else {
break;
}
}
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
}
return textMessage;
} |
Note |
---|
Don´t forget to clean up (call close() ) after the message has been received. |
receiveFromQueue()
...
Code Block |
---|
language | java |
---|
title | receiveFromQueue() |
---|
linenumbers | true |
---|
collapse | true |
---|
|
public String receiveFromQueue() {
String messagetextMessage = nullmessage.getText();
try {
connect();
LOGGER.info("Reading message: " =+ read(textMessage);
sendRequest(message);
disconnect()break;
} catch (Exception e) {
LOGGER.error("Error occurred while publishing to} theelse JOC{
Cockpit instance host:" + HOST + ", port:" + PORT, e);
}
return message;
} |
The SOSConsumer
class
...
Code Block |
---|
language | java |
---|
title | SOSConsumer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SOSConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
private static final String DEFAULT_QUEUE_NAME = "test_queue";
private String uri;
public SOSConsumer (String uri) { break;
}
}
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to read from Destination: ", e);
this.uri} =finally uri;{
}
private Connection createConnection(if(session != null) {
ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
try {
Connection connection = null;
try {
session.close();
connection = factory.createConnection();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to connectclose the session: " , e);
}
}
}
return connection;
}
private Sessionif createSession(Connection connectionconnection != null) {
Session session = null;
try {
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGEclose();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close createthe Sessionconnection: " , e);
}
return session;}
}
private Destination createDestination(Session session)return {textMessage;
} |
The SOSConsumer
class
The below code example shows the complete class.
Code Block |
---|
language | java |
---|
title | SOSConsumer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
package com.sos.jms.consumer;
import javax.jms.Connection;
import return this.createDestination(session, DEFAULT_QUEUE_NAME);
}
private Destination createDestination(Session session, String queueName) {
Destination destination = null;
try {
destination = session.createQueue(queueName)javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SOSConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class);
private static final String DEFAULT_QUEUE_NAME = "test_queue";
private String uri;
}public catchSOSConsumer (JMSExceptionString euri) {
this.uri = uri;
}
LOGGER.error("JMSException occurred while
trying to create Destination:public ",String eread(); {
TextMessage }
message = null;
return destination;
String textMessage = }null;
private MessageConsumer createMessageConsumer(Session session,Connection Destinationconnection destination)= {null;
MessageConsumerSession consumersession = null;
try {
ConnectionFactory consumerfactory = new session.createConsumerActiveMQConnectionFactory(destinationuri);
} connection catch= (JMSException e) {
factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOGGER.error("JMSException occurred while trying toDestination createdestination MessageConsumer: ", e= session.createQueue(DEFAULT_QUEUE_NAME);
}
connection.start();
return consumer;
}
MessageConsumer privateconsumer String= readsession.createConsumer(destination) {;
TextMessage message = null;
while (true) {
String textMessage = null;
ConnectionMessage connectionreceivedMessage = createConnectionconsumer.receive(1);
try {
if (receivedMessage != null) {
Session session = createSession(connection);
Destinationif destination(receivedMessage =instanceof createDestination(session);TextMessage) {
connection.start();
MessageConsumer consumermessage = createMessageConsumer(session, destination)TextMessage) receivedMessage;
while (true) {
textMessage = message.getText();
Message receivedMessage = consumer.receive(1);
if (receivedMessage != null) {LOGGER.info("Reading message: " + textMessage);
if (receivedMessage instanceof TextMessage) {break;
} else {
message = (TextMessage) receivedMessage;
break;
textMessage = message.getText();
}
LOGGER.info("Reading message: " + textMessage);}
}
} catch (JMSException e) break;{
LOGGER.error("JMSException occurred while trying to read from Destination: } else {", e);
} finally {
break;
if(session != null) {
try }{
}
session.close();
}
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to readclose fromthe Destinationsession: " , e);
} finally {
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
LOGGER.error("JMSException occurred while trying to close the connection: " , e);
}
}
}
return textMessage;
}
public String receiveFromQueue() {
return read();
}
} |
Java Class with a main(String[] args) method (example JmsExecute.java)
...