Page History
...
A zip file of this example as a complete maven project implementation is available for download: js7-jms-example-js7-project.zip.
Prerequisites
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public void write(String text, String queueName, long ttl) throws Exception { Connection connection = null; Session session = null; try { ConnectionFactory factory = new ActiveMQConnectionFactory(uri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAMEqueueName); MessageProducer producer = session.createProducer(destination); // 5 sec time to live for the producer for this showcase producer.setTimeToLive(ttl); Message message = null; if(text != null){ message = session.createTextMessage(text); } else{ message = session.createTextMessage(TEXT); } producer.send(message); } catch (JMSExceptionThrowable e) { LOGGER.error("JMSException occurred while trying to write Message to Destination: " , e); } finally { throw e; } finally { if(session != null) { try { session.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e); } } } } |
The SOSProducer
class
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package com.sos.jms.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class SOSProducer { private static final Logger LOGGER = Logger.getLogger(SOSProducer.class); private static final String QUEUE_NAME = "test_queue"; private static final String TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; private String uri; public SOSProducer(String uri) { this.uri = uri; } public void write(String text){ , String queueName) throws Connection connection = null;Exception { Session session = nullwrite(text, queueName, 5000L); } try { public void write(String text, String queueName, long ttl) throws Exception { Connection connection = null; Session session = null; try { ConnectionFactory factory = new ActiveMQConnectionFactory(uri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(QUEUE_NAMEqueueName); MessageProducer producer = session.createProducer(destination); Message// message5 =sec null; time to live for the producer for this showcase if(text producer.setTimeToLive(ttl); Message message = null; if(text != null){ message = session.createTextMessage(text); } else{ message = session.createTextMessage(TEXT); } producer.send(message); } catch (JMSExceptionThrowable e) { LOGGER.error("JMSException occurred while trying to write Message to Destination: " ,); throw e); } finally { if(session != null) { try { session.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e); } } } } } |
The MessageConsumer
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public String read(String queueName) throws Exception { TextMessage message = null; String textMessage = null; Connection connection = null; Session session = null; try { ConnectionFactory factory = new ActiveMQConnectionFactory(uri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DEFAULT_QUEUE_NAMEqueueName); connection.start(); MessageConsumer consumer = session.createConsumer(destination); while (true) { Message receivedMessage = consumer.receive(1); if (receivedMessage != null) { if (receivedMessage instanceof TextMessage) { message = (TextMessage) receivedMessage; textMessage = message.getText(); LOGGER.info("Reading message: " + textMessage); break; } else { break; } } } } catch (JMSExceptionThrowable e) { LOGGER.error("JMSException occurred while trying to read from Destination: ", e); throw e; } finally { if(session != null) { try { session.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e); } } } return textMessage; } |
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package com.sos.jms.consumer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SOSConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(SOSConsumer.class); private static final String DEFAULT_QUEUE_NAME = "test_queue"; private String uri; public SOSConsumer (String uri) { this.uri = uri; } public String read(String queueName) throws Exception { TextMessage message = null; String textMessage = null; Connection connection = null; Session session = null; try { ConnectionFactory factory = new ActiveMQConnectionFactory(uri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(DEFAULT_QUEUE_NAMEqueueName); connection.start(); MessageConsumer consumer = session.createConsumer(destination); while (true) { Message receivedMessage = consumer.receive(1); if (receivedMessage != null) { if (receivedMessage instanceof TextMessage) { message = (TextMessage) receivedMessage; textMessage = message.getText(); LOGGER.info("Reading message: " + textMessage); break; } else { break; } } } } catch (JMSExceptionThrowable e) { LOGGER.error("JMSException occurred while trying to read from Destination: ",); throw e); } finally { if(session != null) { try { session.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the session: " , e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.errorwarn("JMSException occurred while trying to close the connection: " , e); } } } return textMessage; } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package com.sos.jms; import java.io.IOException; import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Base64; import java.util.Properties; import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sos.commons.httpclient.SOSRestApiClient; import com.sos.jms.consumer.SOSConsumer; import com.sos.jms.producer.SOSProducer; public class JmsExecute { private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616"; private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/"; private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\"" + ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; private static final String DEFAULT_USERNAME = "root"; private static final String DEFAULT_PWD = "root"; private static final String APIDEFAULT_ADD_QUEUE_NAME = "test_queue"; private static final String API_ADD_ORDER = "orders/add"; private static final String API_LOGIN = "authentication/login"; private static final String API_LOGOUT = "authentication/logout"; private static final String ACCESS_TOKEN_HEADER = "X-Access-Token"; private static final String APPLICATION_JSON = "application/json"; private static final String CONTENT_TYPE = "Content-Type"; private static final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class); private static String jmsServerUri = null; private static String jocApiUri = null; private static String controllerId = null; private static String workflowPath = null; private static String requestBody = null; private static String username = null; private static String pwd = null; private static String queueName = null; private static Long queueTtl = null; public static void main(String[] args) throws URISyntaxException { for (int iSOSRestApiClient client = 0null; i < args.length; i++) try { LOGGER.info("args["+ i +"] = " + args[i]); } URL classUrl = JmsExecute.classURL classUrl = JmsExecute.class.getProtectionDomain().getCodeSource().getLocation(); Path classPath = Paths.get(classUrl.toURI()); String filename = classPath.getFileName().toString().replace(".jar", ".config"); LOGGER.info(classPath.getParent().resolve(filename).toString()); readPropertiesFile(classPath.getParent().resolve(filename)); if ("produce".equals(args[0])) { SOSProducer producer = new SOSProducer(jmsServerUri); LOGGER.info("message send to queue:"); LOGGER.info(requestBody); producer.write(requestBody, queueName, queueTtl); } else if ("consume".equals(args[0])) { SOSConsumer consumer = new SOSConsumer(jmsServerUri); String consumedMessage = null; consumedMessage = consumer.read(queueName); LOGGER.info("message received from queue:"); LOGGER.info(consumedMessage); SOSRestApiClient client =if setupHttpClient(username, pwd); (consumedMessage != null) { try {client = setupHttpClient(username, pwd); URI jocUri = URI.create(jocApiUri); LOGGER.info("send login to: " + jocUri.resolve(API_LOGIN).toString()); String response = client.postRestService(jocUri.resolve(API_LOGIN), null); LOGGER.info("HTTP status code: " + client.statusCode()); if (client.statusCode() == 200) { JsonReader jsonReader = null; String accessToken = null; try { jsonReader = Json.createReader(new StringReader(response)); JsonObject json = jsonReader.readObject(); accessToken = json.getString("accessToken", ""); } catch (Exception e) { throw new LOGGER.warnException("Could not determine accessToken.", e); } finally { jsonReader.close(); } client.addHeader(ACCESS_TOKEN_HEADER, accessToken); client.addHeader(CONTENT_TYPE, APPLICATION_JSON); LOGGER.info("REQUEST: " + API_ADD_ORDER); LOGGER.info("PARAMS: " + consumedMessage); String apiUrl = null; if (!API_ADD_ORDER.toLowerCase().startsWith(jocApiUri)) { apiUrl = jocApiUri + API_ADD_ORDER; } } LOGGER.info("resolvedUri: " + jocUri.resolve(apiUrl).toString()); response = client.postRestService(jocUri.resolve(apiUrl), consumedMessage); LOGGER.info("HTTP status code: " + client.statusCode()); response = client.postRestService(jocUri.resolve(API_LOGOUT), null); LOGGER.info("HTTP status code: " + client.statusCode()); } } } catch (Exception e) { } LOGGER.error(e.getMessage(), e);} } catch } finally(Throwable e) { client.closeHttpCliente.printStackTrace(); }System.exit(1); } finally { } private static SOSRestApiClientif setupHttpClient(String client != null) { client.closeHttpClient(); } } } private static SOSRestApiClient setupHttpClient(String username, String password) { SOSRestApiClient client = new SOSRestApiClient(); String basicAuth = Base64.getMimeEncoder().encodeToString((username + ":" + password).getBytes()); client.setBasicAuthorization(basicAuth); return client; } private static String cleanupValue(String value) { if (value.startsWith("\"")) { value = value.substring(1); } if (value.endsWith("\"")) { value = value.substring(0, value.length() - 1); } return value; } private static void readPropertiesFile (Path path) { Properties props = new Properties(); try { props.load(Files.newInputStream(path)); jmsServerUri = cleanupValue(props.getProperty("jms_url")); LOGGER.info("cfg jms_url: " + jmsServerUri); queueName = cleanupValue(props.getProperty("jms_queue_name")); propsLOGGER.load(Files.newInputStream(path)info("cfg jms_queue_name: " + queueName); jmsServerUriqueueTtl = Long.parseLong(cleanupValue(props.getProperty("jms_queue_urlname"))); LOGGER.info("cfg jms_queue_urlttl: " + jmsServerUriqueueTtl.toString()); jocApiUri = cleanupValue(props.getProperty("joc_api_url")); LOGGER.info("cfg joc_api_url: " + jocApiUri); controllerId = cleanupValue(props.getProperty("controller_id")); LOGGER.info("cfg controller_id: " + controllerId); workflowPath = cleanupValue(props.getProperty("workflow_path")); LOGGER.info("cfg workflow_path: " + workflowPath); username = cleanupValue(props.getProperty("username")); pwd = cleanupValue(props.getProperty("password")); requestBody = "{\"controllerId\":\"" + controllerId + "\",\"orders\":[{\"workflowPath\":\"" + workflowPath workflowPath + "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; } catch (IOException e) { LOGGER.warn("could not read properties file, use defaults instead."); jmsServerUri = DEFAULT_JMS_URI; queueName = DEFAULT_QUEUE_NAME; jocApiUri = DEFAULT_JOC_API_URL; requestBody = DEFAULT_JOC_API_REQUEST_BODY; username = DEFAULT_USERNAME; pwd = DEFAULT_PWD; queueTtl = 5000L; } } } |
Maven Configuration Example
...
Overview
Content Tools