Page History
...
The code example below shows the complete class. The code includes creates a default TEXT message consisting of a the body for a JS7 /orders/add
API request.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package com.sos.jms.producer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; public class SOSProducer { private static final Logger LOGGER = Logger.getLogger(SOSProducer.class); private static final String TEXT = "{\"controllerId\":\"testsuite\",\"orders\":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; private String uri; public SOSProducer(String uri) { this.uri = uri; } public void write(String text, String queueName) throws Exception { write(text, queueName, 5000L); } public void write(String text, String queueName, long ttl) throws Exception { Connection connection = null; Session session = null; try { ConnectionFactory factory = new ActiveMQConnectionFactory(uri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); // 5 sec time to live for the producer for this showcase producer.setTimeToLive(ttl); Message message = null; if(text != null){ message = session.createTextMessage(text); } else{ message = session.createTextMessage(TEXT); } producer.send(message); } catch (Throwable e) { LOGGER.error("JMSException occurred while trying to write Message to Destination: "); throw e; } finally { if(session != null) { try { session.close(); } catch (JMSException e) { LOGGER.warn("JMSException occurred while trying to close the session: ", e); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { LOGGER.warn("JMSException occurred while trying to close the connection: ", e); } } } } } |
...
This section describes how to establish a connection to a MQ server Message Queue Service and receive a message from a queue on this server. Furthermore it shows how to connect to a JOC Cockpit instance via HTTP to send an api API request:
- create a connection,
- create a session,
- create a destination.
- create a consumer,
- receive a message with the consumer,
- close the (MQ) connection,
- login to a JOC Cockpit instance via a HTTP REST API call,
- send a ./orders/add API request to a JOC Cockpit instance,
- close the connection.
...
The example class uses the SOSRestApiClient to create the HTTP connection. The SOSRestApiClient is based on the org.apache.httpcomponents::httpclient. Users can use their own HTTP client implementation.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package com.sos.jms; import java.io.IOException; import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Base64; import java.util.Properties; import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sos.commons.httpclient.SOSRestApiClient; import com.sos.jms.consumer.SOSConsumer; import com.sos.jms.producer.SOSProducer; public class JmsExecute { private static final String DEFAULT_JMS_URI = "tcp://activemq-5-15:61616"; private static final String DEFAULT_JOC_API_URL = "http://centostest_primary.sos:7446/joc/api/"; private static final String DEFAULT_JOC_API_REQUEST_BODY = "{\"controllerId\":\"testsuite\",\"orders\"" + ":[{\"workflowPath\":\"/JS7Demo/01_HelloWorld/jdHelloWorld\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; private static final String DEFAULT_USERNAME = "root"; private static final String DEFAULT_PWD = "root"; private static final String DEFAULT_QUEUE_NAME = "test_queue"; private static final String API_ADD_ORDER = "orders/add"; private static final String API_LOGIN = "authentication/login"; private static final String API_LOGOUT = "authentication/logout"; private static final String ACCESS_TOKEN_HEADER = "X-Access-Token"; private static final String APPLICATION_JSON = "application/json"; private static final String CONTENT_TYPE = "Content-Type"; private static final Logger LOGGER = LoggerFactory.getLogger(JmsExecute.class); private static String jmsServerUri = null; private static String jocApiUri = null; private static String controllerId = null; private static String workflowPath = null; private static String requestBody = null; private static String username = null; private static String pwd = null; private static String queueName = null; private static Long queueTtl = null; public static void main(String[] args) throws URISyntaxException { SOSRestApiClient client = null; try { URL classUrl = JmsExecute.class.getProtectionDomain().getCodeSource().getLocation(); Path classPath = Paths.get(classUrl.toURI()); String filename = classPath.getFileName().toString().replace(".jar", ".config"); LOGGER.info(classPath.getParent().resolve(filename).toString()); readPropertiesFile(classPath.getParent().resolve(filename)); if ("produce".equals(args[0])) { SOSProducer producer = new SOSProducer(jmsServerUri); LOGGER.info("message send to queue:"); LOGGER.info(requestBody); producer.write(requestBody, queueName, queueTtl); } else if ("consume".equals(args[0])) { SOSConsumer consumer = new SOSConsumer(jmsServerUri); String consumedMessage = null; consumedMessage = consumer.read(queueName); LOGGER.info("message received from queue:"); LOGGER.info(consumedMessage); if (consumedMessage != null) { client = setupHttpClient(username, pwd); URI jocUri = URI.create(jocApiUri); LOGGER.info("send login to: " + jocUri.resolve(API_LOGIN).toString()); String response = client.postRestService(jocUri.resolve(API_LOGIN), null); LOGGER.info("HTTP status code: " + client.statusCode()); if (client.statusCode() == 200) { JsonReader jsonReader = null; String accessToken = null; try { jsonReader = Json.createReader(new StringReader(response)); JsonObject json = jsonReader.readObject(); accessToken = json.getString("accessToken", ""); } catch (Exception e) { throw new Exception("Could not determine accessToken.", e); } finally { jsonReader.close(); } client.addHeader(ACCESS_TOKEN_HEADER, accessToken); client.addHeader(CONTENT_TYPE, APPLICATION_JSON); LOGGER.info("REQUEST: " + API_ADD_ORDER); LOGGER.info("PARAMS: " + consumedMessage); String apiUrl = null; if (!API_ADD_ORDER.toLowerCase().startsWith(jocApiUri)) { apiUrl = jocApiUri + API_ADD_ORDER; } LOGGER.info("resolvedUri: " + jocUri.resolve(apiUrl).toString()); response = client.postRestService(jocUri.resolve(apiUrl), consumedMessage); LOGGER.info("HTTP status code: " + client.statusCode()); response = client.postRestService(jocUri.resolve(API_LOGOUT), null); LOGGER.info("HTTP status code: " + client.statusCode()); } } } } catch (Throwable e) { e.printStackTrace(); System.exit(1); } finally { if (client != null) { client.closeHttpClient(); } } } private static SOSRestApiClient setupHttpClient(String username, String password) { SOSRestApiClient client = new SOSRestApiClient(); String basicAuth = Base64.getMimeEncoder().encodeToString((username + ":" + password).getBytes()); client.setBasicAuthorization(basicAuth); return client; } private static String cleanupValue(String value) { value = value.trim(); if (value.startsWith("\"")) { value = value.substring(1); } if (value.endsWith("\"")) { value = value.substring(0, value.length() - 1); } return value; } private static void readPropertiesFile(Path path) { Properties props = new Properties(); try { props.load(Files.newInputStream(path)); jmsServerUri = cleanupValue(props.getProperty("jms_url")); LOGGER.info("cfg jms_url: " + jmsServerUri); queueName = cleanupValue(props.getProperty("jms_queue_name")); LOGGER.info("cfg jms_queue_name: " + queueName); queueTtl = Long.parseLong(cleanupValue(props.getProperty("jms_queue_name"))); LOGGER.info("cfg jms_queue_ttl: " + queueTtl.toString()); jocApiUri = cleanupValue(props.getProperty("joc_api_url")); LOGGER.info("cfg joc_api_url: " + jocApiUri); controllerId = cleanupValue(props.getProperty("controller_id")); LOGGER.info("cfg controller_id: " + controllerId); workflowPath = cleanupValue(props.getProperty("workflow_path")); LOGGER.info("cfg workflow_path: " + workflowPath); username = cleanupValue(props.getProperty("username")); pwd = cleanupValue(props.getProperty("password")); requestBody = "{\"controllerId\":\"" + controllerId + "\",\"orders\":[{\"workflowPath\":\"" + workflowPath + "\",\"scheduledFor\":\"now\"}],\"auditLog\":{}}"; } catch (IOException e) { LOGGER.warn("could not read properties file, use defaults instead."); jmsServerUri = DEFAULT_JMS_URI; queueName = DEFAULT_QUEUE_NAME; jocApiUri = DEFAULT_JOC_API_URL; requestBody = DEFAULT_JOC_API_REQUEST_BODY; username = DEFAULT_USERNAME; pwd = DEFAULT_PWD; queueTtl = 5000L; } } } |
...