Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

This article describes how to implement Java a JS7 Workflow with two shell jobs that communicate over a Message Queue (MQ). These jobs include a job for publishing for publishing and one for receiving and executing JobScheduler JS7 JOC API commandscalls.

The document explains which classes and methods have to be created and their purposes. This example was developed with the use of Apache MQ.

Goal of the Example

The example covers the specifics required to achieve the use case described below. It neither covers It neither covers the complete job JS7 JOC API nor the complete possibilities of JMS.

For the coverage of the JobScheduler job JS7 JOC API refer to the XML part of the JOC API Documentation.

For the coverage of the JMS API refer to the Oracle Java Documentation.

The Use Case

The use case consists of the following steps:

  1. Run a Java Job in the JobScheduler which sends an XML fragment shell Job that runs a Java class on an Agent which sends a JSON body to a message queue (MQ) 
  2. Run a shell Job that runs a Java Job in the JobScheduler which receives an XML fragment class on an Agent which receives a JSON body from an MQ Execute an XML command in the JobSchedulerand
  3. executes a JOC API call with the given JSON body.

What does this example explain?

The steps described in this article are as follows:

  • How to implement a Java job class using the job API
    • How to implement a Producer Job
    • How to implement a Consumer Job
    • How to execute an XML fragment using the job API
    • How to deploy the jobs to the JobScheduler
    • How to configure the JobScheduler Jobs with JOE using the developed classes

    Download

      • a SOSProducer to create and send a message to the queue.
      • a SOSConsumer to receive a message from the queue.
    • How to login to the JS7 JOC API, send a request and logout again.

    Download

    The library with the The classes described in this example can be downloaded here.

    The job workflow configuration (made with JOE) can be downloaded here.

    Prerequisites

    The following dependency is needed to write a Java job for the JobScheduler.

    For this example the activemq-all-5.15.0.jar library is used.

    • Either download the jar file engine-job-api.jarThe library is hosted on Maven Central. Please download the jar and add it to the classpath
    • or in case of the Java project. If the Java project already is a Maven project, simply add the a Maven project add the following dependency to the project configuration.
    Code Block
    languagexml
    titleMaven Dependency dependency for engineactivemq-job-apiall
    collapsetrue
    <dependency>
        <groupId>com<groupId>org.sos-berlin.jobscheduler.engine<apache.activemq</groupId>
        <artifactId>engine<artifactId>activemq-job-api<all</artifactId>
        <version>1<version>5.1015.3<0</version>
    </dependency>
      • Make sure to use the correct version suitable for the JobScheduler version in use.

    For this example the activemq-all-5.13.0.jar library is used.

    • Either download the jar file and add it to the classpath
    • or in case of a Maven project add the following dependency to the project configuration.

    The Workflow

    The example Workflow has two Jobs which both call the main class of the JmsExample.java class.

    Code Block
    languagejs
    titlecreateConnection(String uri)
    linenumberstrue
    Code Block
    languagexml
    titleMaven dependency for activemq-all
    collapsetrue
    <dependency>{
        <groupId>org.apache.activemq</groupId>"version": "1.1.0",
      "timeZone": "Europe/Berlin",
      "instructions": [
        <artifactId>activemq-all</artifactId>
    {
          <version>5.13.0</version>
    </dependency>

    The Basic Structure of an API Job Java Class

    A Java Job using the JobScheduler API has to extend the Job_impl class.

    It has to overwrite the method spooler_process().

    Code Block
    languagejava
    titleBase Structure of an API Job
    collapsetrue
    public class CLASSNAME extends Job_impl {
    
        @Override
        public boolean spooler_process() throws Exception {
    		// This is the place where the work is done!"TYPE": "Execute.Named",
          "jobName": "producer_job",
          "label": "job_produce"
        },
        {
          "TYPE": "Execute.Named",
          "jobName": "consumer_job",
          "label": "job_consume"
     return spooler_job.order_queue() != null;}
      ],
      }
    }

    The Return Value of spooler_process()

    If the Java job is an order job, it has to return true for the order to be able to continue with the next task if the job ends without an error.

    If the Java job is a standalone job it has to return false for the JobScheduler to be able to recognize that the task has finished.

    This can be achieved by using the return value spooler_job.order_queue() != null. It determines if an order is in the queue of the job (true) or not (false).

    Logging

    Because the Java Jobs run API methods, we can directly use the logging feature of the JobScheduler. As shown in the detailed method examples below, logging is done using the spooler_log method.

    Make sure to throw the caught exception in order to hand it over to the spooler_process() method. This is needed for a job running in a job chain to determine that it has to fail if an error occurs.

    Initialization of the MQ Connection

    Both Java jobs need methods to initialize a connection to an MQ Server. This section of the document shows how to implement them. The implementation is based on the JMS implementation used and shipped by ActiveMQ.

    To keep this example simple these methods are put in both Java job classes. To prevent duplicates in a real project´s source code, it is better to put these methods in a single class and extend the class in the two job classes.

    The createConnection() method

    This method instantiates a ConnectionFactory object with an ActiveMQConnectionFactory object and creates a Connection object through the factory method createConnection().

    The ActiveMQConnectionFactory object has to be instantiated with the URL of the MQ server.

    Code Block
    languagejava
    titlecreateConnection(String uri)
    linenumberstrue
    collapsetrue
    public Connection createConnection(String uri){
        ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
        Connection jmsConnection = null;
            try {
                jmsConnection = factory.createConnection();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to connect: ");
                throw e;
            }
            return jmsConnection;
    }

    The createSession(Connection connection) method

    The method is called with an already instantiated Connection object and instantiates a Session object through the Connection object´s createSession(boolean transacted, int acknowledgeMode) method.

    The acknowledgeMode has the following options among others:

    In summary the AUTO_ACKNOWLEDGE mode results in the message be dequeued when one consumer has read the message from the MQ server, whereas the CLIENT_ACKNOWLEDGE puts the responsibility on the client.

    When the CLIENT_ACKNOWLEDGE mode is set, the message will stay present for all consumers to read until a consumer acknowledges the message. Only then the message will be dequeued and not be available for further consumers.

    Code Block
    languagejava
    titlecreateSession(Connection connection)
    linenumberstrue
    collapsetrue
    private Session createSession(Connection connection) throws JMSException{
        Session session = null;
        try "jobs": {
        "producer_job": {
          "agentName": "standaloneAgent",
          "executable": {
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {"TYPE": "ShellScriptExecutable",
            spooler_log.error("JMSException occurred while trying to create Session: ");"script": "java -jar C:/sp/devel/js7/JMS/jms-example-js7.jar produce",
            throw e;
    "v1Compatible": false
          },
         return session;
    }

    The createDestination(Session session, String queueName) method

    This method creates a Destination object. It is called with an active Session object and the name of the queue to write to. The Destination object is initiated through the createQueue(String name) method of the Session object.

    Code Block
    languagejava
    titlecreateDestination(Session session, String queueName)
    linenumberstrue
    collapsetrue
    private Destination createDestination(Session session, String queueName) throws JMSException{ "skipIfNoAdmissionForOrderDay": false,
          "parallelism": 1,
          "graceTimeout": 15,
          "failOnErrWritten": false,
          "warnOnErrWritten": false,
        Destination destination = null;"title": "produce"
        try {},
        "consumer_job": {
       destination = session.createQueue(queueName);   "agentName": "standaloneAgent",
        } catch (JMSException e) "executable": {
            spooler_log.error("JMSException occurred while trying to create Destination: "); "TYPE": "ShellScriptExecutable",
            "script": "java -jar C:/sp/devel/js7/JMS/jms-example-js7.jar consume",
            throw e;
        }
        return destination;
    }

    The createConnectionUrl (String protocol, String hostName, String port) method

    This is a helper method which creates a connection URI based on the job parameters. It simply uses a StringBuilder to create a string in the format [protocol]://[hostname]:[port].

    Code Block
    languagejava
    titlecreateConnectionUrl (String protocol, String hostName, String port)
    linenumberstrue
    collapsetrue
    private String createConnectionUrl (String protocol, String hostName, String port){
        StringBuilder strb = new StringBuilder();
        strb.append(protocol).append("://").append(hostName).append(":").append(port);
        return strb.toString();
    }

    Implementation of the Producer Job

    There are three more methods needed for the producer Java job to run:

    • createMessageProducer(Session session, Destination destination)
    • write(String text, String connectionUrl, String queueName)
    • execute()

    The createMessageProducer(Session session, Destination destination) method

    "v1Compatible": false
          },
          "skipIfNoAdmissionForOrderDay": false,
          "parallelism": 1,
          "graceTimeout": 15,
          "failOnErrWritten": false,
          "warnOnErrWritten": false,
          "title": "consumer"
        }
      }
    }

    The createSession(Connection connection) method

    The method is called with an already instantiated Connection object and instantiates a Session object through the Connection object´s createSession(boolean transacted, int acknowledgeMode) method.

    The acknowledgeMode has the following options among others:

    In summary the AUTO_ACKNOWLEDGE mode results in the message be dequeued when one consumer has read the message from the MQ server, whereas the CLIENT_ACKNOWLEDGE puts the responsibility on the client.

    When the CLIENT_ACKNOWLEDGE mode is set, the message will stay present for all consumers to read until a consumer acknowledges the message. Only then the message will be dequeued and not be available for further consumers This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageProducer object with the given session and destination.


    Code Block
    languagejava
    titlecreateMessageProducer(Session session, Destination destinationcreateSession(Connection connection)
    linenumberstrue
    collapsetrue
    publicprivate MessageProducerSession createMessageProducercreateSession(Session session, Destination destinationConnection connection) throws JMSException{
        MessageProducerSession producersession = null;
        try {
            producersession = sessionconnection.createProducer(destinationcreateSession(false, Session.CLIENT_ACKNOWLEDGE);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageProducerSession: ");
            throw e;
        }
        return producersession;
    }

    ...

    The createDestination(Session session, String

    ...

    queueName) method

    ...

    This method creates a Destination object. It is called with an active Session object and the name of the queue to write to. The Destination object is initiated through the createQueue(String name) method of the Session objectThe method is called with the message to be sent to the server, the connection URL to the server and the queue name to be used for publishing. The message is a String object. The method instantiates a Message object with the text to send. This method makes use of the methods described above. It instantiates all objects needed for the connection and sends a Message object. Also the methods throws an error if one occurs. Although this method may throw an error, it still calls the MessageProducer´s send(Message message) method in a try..catch..finally block. That is because the finally block is used to close the connection in the end.

    Code Block
    languagejava
    titlewritecreateDestination(String textSession session, String connectionUrl, String queueName)
    linenumberstrue
    collapsetrue
    publicprivate voidDestination writecreateDestination(StringSession textsession, String connectionUrl, String queueName) throws JMSException{
        ConnectionDestination connectiondestination = createConnection(connectionUrl)null;
        Sessiontry {
     session = createSession(connection);
        Destination destination = createDestination(session, session.createQueue(queueName);
        MessageProducer} producercatch = createMessageProducer(session,JMSException destinatione);
        Message message = null; {
        try {
       spooler_log.error("JMSException occurred while trying to messagecreate = session.createTextMessage(textDestination: ");
            producer.send(message)throw e;
        }
        catch (JMSException e) {
            spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
            throw e;
        } finally {
            if (connection != null) return destination;
    }

    Implementation of the Producer Job

    There are three more methods needed for the producer Java job to run:

    • createMessageProducer(Session session, Destination destination)
    • write(String text, String connectionUrl, String queueName)
    • execute()

    The createMessageProducer(Session session, Destination destination) method

     This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageProducer object with the given session and destination.

    Code Block
    languagejava
    titlecreateMessageProducer(Session session, Destination destination)
    linenumberstrue
    collapsetrue
    public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
        MessageProducer producer =  null;
        try {
            producer        connection.close(= session.createProducer(destination);
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred in ProducerJob while trying to closecreate the connectionMessageProducer: ");
                    throw e;
                }
            }
        }
    }

    The execute() method

    return producer;
    }

    The write(String text, String connectionUrl, String queueName) method

    The method is called with the message to be sent to the server, the connection URL to the server and the queue name to be used for publishing. The message is a String object. The method instantiates a Message object with the text to send. This method makes use of the methods described above. It instantiates all objects needed for the connection and sends a Message object. Also the methods throws an error if one occurs. Although this method may throw an error, it still calls the MessageProducer´s send(Message message) method in a try..catch..finally block. That is because the finally block is used to close the connection in the endThis is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method  and subsequently calls the write(..) method to send the message to the MQ server.

    Code Block
    languagejava
    titleexecute(write(String text, String connectionUrl, String queueName)
    linenumberstrue
    collapsetrue
    privatepublic void execute(write(String text, String connectionUrl, String queueName) throws Exception JMSException{
        Variable_setConnection paramsconnection = spooler_task.params();
        params.merge(spooler_task.order().params()createConnection(connectionUrl);
        Session  spooler_log.debug9(params.xml())session = createSession(connection);
        StringDestination protocoldestination = spooler_task.params().value("MQ_Protocol"createDestination(session, queueName);
        spooler_log.debug9("Received protocol: " + protocolMessageProducer producer = createMessageProducer(session, destination);
        StringMessage messageHostmessage = spooler_task.params().value("MQ_Host")null;
        spooler_log.debug9("Received MQ Host: " + messageHost);
    try {
            String messagePortmessage = spooler_tasksession.paramscreateTextMessage().value("MQ_Port")text);
        spooler_log.debug9("Received MQ port: " + messagePort producer.send(message);
        String} queueNamecatch = spooler_task.params().value("MQ_QueueName");(JMSException e) {
            spooler_log.debug9error("ReceivedJMSException Queueoccurred name:in "ProducerJob + queueName);
        String message = spooler_task.params().value("messagewhile trying to write Message to Destination: ");
        spooler_log.debug9("Received message: " + message)    throw e;
        } finally {
            if (protocolconnection =!= null || (protocol != null && protocol.isEmpty())){
    ) {
                try {
              protocol = DEFAULT_PROTOCOL;
        }connection.close();
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            } catch (JMSException e) {
               queueName = DEFAULT_QUEUE_NAME;
        }
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        if(message != null && !message.isEmpty()){
    spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
                   write(message, connectionUrl, queueName) throw e;
        } else {
            spooler_log.error("Message is empty, nothing to send to message server");}
            }
        }
    }

    Putting the Bricks Together

    The last thing to do is to call the execute() method in spooler_process() as shown in the complete code example below.

    Code Block
    languagejava
    titleComplete Source of the MessageProducerExampleJob
    linenumberstrue
    collapsetrue
    package com.sos.jitl.messaging;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import sos.spooler.Job_impl;
    import sos.spooler.Variable_set;
    
    public class MessageProducerExampleJob extends Job_impl {
    
        private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
        private static final String DEFAULT_PROTOCOL = "tcp";
    
        @Override
        public boolean spooler_process() throws Exception {
            try {
                execute();
            } catch (Exception e) {
                spooler_log.error("Error occured in spooler_process() of MessageProducerExampleJob: ");
                throw e;
            }
            return  spooler_job.order_queue() != null;
        }
    
        private void execute() throws Exception {
            Variable_set params = spooler_task.params();
            params.merge(spooler_task.order().params());
            spooler_log.debug9(params.xml());
            String protocol = spooler_task.params().value("MQ_Protocol");
            spooler_log.debug9("Received protocol: " + protocol);
            String messageHost = spooler_task.params().value("MQ_Host");
            spooler_log.debug9("Received MQ Host: " + messageHost);
            String messagePort = spooler_task.params().value("MQ_Port");
            spooler_log.debug9("Received MQ port: " + messagePort);
            String queueName = spooler_task.params().value("MQ_QueueName");
            spooler_log.debug9("Received Queue name: " + queueName);
            String message = spooler_task.params().value("message");
            spooler_log.debug9("Received message: " + message);
            if(protocol == null || (protocol != null && protocol.isEmpty())){
                protocol = DEFAULT_PROTOCOL;
            }
            if(queueName == null || (queueName != null && queueName.isEmpty())){
                queueName = DEFAULT_QUEUE_NAME;
            }
            String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
            if(message != null && !message.isEmpty()){
                write(message, connectionUrl, queueName);
            } else {
                spooler_log.error("Message is empty, nothing to send to message server");
            }
        }
    
        private String createConnectionUrl (String protocol, String hostName, String port){
            StringBuilder strb = new StringBuilder();
            strb.append(protocol).append("://").append(hostName).append(":").append(port);
            return strb.toString();
        }
    
        private Connection createConnection(String uri) throws JMSException{
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            Connection jmsConnection = null;
            try {
                jmsConnection = factory.createConnection();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to connect: ");
                throw e;
            }
            return jmsConnection;
        }
        
        private Session createSession(Connection connection) throws JMSException{
            Session session = null;
            try {
                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to create Session: ");
                throw e;
            }
            return session;
        }
        
        private Destination createDestination(Session session, String queueName) throws JMSException{
            Destination destination = null;
            try {
                destination = session.createQueue(queueName);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to create Destination: ");
                throw e;
            }
            return destination;
        }
        
        public MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException{
            MessageProducer producer = null;
            try {
                producer = session.createProducer(destination);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to create MessageProducer: ");
                throw e;
            }
            return producer;
        }
        
        public void write(String text, String connectionUrl, String queueName) throws JMSException{
            Connection connection = createConnection(connectionUrl);
            Session session = createSession(connection);
            Destination destination = createDestination(session, queueName);
            MessageProducer producer = createMessageProducer(session, destination);
            Message message = null;
            try {
                message = session.createTextMessage(text);
                producer.send(message);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred in ProducerJob while trying to write Message to Destination: ");
                throw e;
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        spooler_log.error("JMSException occurred in ProducerJob while trying to close the connection: ");
                        throw e;
                    }
                }
            }
        }
        
    }

    Implementation of the Consumer Job

    As seen above in the Producer Job example, there are also additional methods needed for the consumer Java job to run.

    • createMessageConsumer(Session session, Destination destination)
    • read(Connection connection, String queueName, Boolean closeMessage)
    • executeXml(String message)
    • executeXmlForAllTargets(String message)
    • execute()

    The createMessageConsumer(Session session, Destination destination) method

    This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageConsumer object with the given session and destination.

    Code Block
    languagejava
    titlecreateMessageConsumer(Session session, Destination destination)
    linenumberstrue
    collapsetrue
    private MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
        MessageConsumer consumer = null;
        try {
            consumer = session.createConsumer(destination);
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to create MessageConsumer: ");
            throw e;
        }
        return consumer;
    }

    The read(Connection connection, String queueName, Boolean closeMessage) method

    The method is called with an already instantiated Connection object, the name of the queue to read from and a flag to determine if this is the last consumer. If it is the last consumer for the messages in the specified queue then the received message will be acknowledged. This method makes use of the methods described above. It instantiates all objects needed for the connection and reads a Message object from the given queue. It extracts the value from the Message object as a string representation via the Message object´s getText() method. We have to make sure that connection.start() is called before the MessageConsumer object is instantiated.

    Code Block
    languagejava
    titleread(Connection connection, String queueName, Boolean closeMessage)
    linenumberstrue
    collapsetrue
    private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
        String messageText = null;
        try {
            Session session = createSession(connection);
            Destination destination = createDestination(session, queueName);
            connection.start();
            MessageConsumer consumer = createMessageConsumer(session, destination);
            Message receivedMessage = null;
            while (true) {
                receivedMessage = consumer.receive(1);
                if (receivedMessage != null) {
                    if (receivedMessage instanceof TextMessage) {
                        TextMessage message = (TextMessage) receivedMessage;
                        messageText = message.getText();
                        break;
                    } else {
                        break;
                    }
                }
            }
            if(closeMessage){
                receivedMessage.acknowledge();
            }
        } catch (JMSException e) {
            spooler_log.error("JMSException occurred while trying to read from Destination: ");
            throw e;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    spooler_log.error("JMSException occurred while trying to close the connection: ");
                }
            }
        }
        return messageText;
    }

    The executeXml(String message) method

    This method gets the received message and sends it through the spooler.executeXml(String text) API method with additional debug information.

    Code Block
    languagejava
    titleexecuteXml(String message)
    linenumberstrue
    collapsetrue
    private void executeXml(String message) {
        spooler_log.debug9("execute XML started");
        String answer = spooler.execute_xml(message);
        spooler_log.debug9("Return value of executeXML: " + answer);
        spooler_log.debug9("execute XML finished");
    }

    The executeXmlForAllTargets(String message) method

    This method calls the executeXml(String message) method mentioned above for each given target job chain. It replaces the job_chain argument of the (XML) message with the given job chain names from the job parameter.

    Code Block
    languagejava
    titleexecuteXmlForAllTargets(String message)
    linenumberstrue
    collapsetrue
    private void executeXmlForAllTargets(String message) {
        if (targetJobChains.contains(DELIMITER) && message.contains("add_order")) {
            String [] jobChainNames = targetJobChains.split("[" + DELIMITER + "]");
            for (String name : jobChainNames){
                spooler_log.debug9("add_order XML will be adjusted for JobChain: " + name);
                executeXml(message.replaceFirst("job_chain='[^']*'", "job_chain='" + name + "'"));
            }            
        }else{
            executeXml(message);
        }
    }

    The execute() method

    This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method and subsequently calls the read(..) method. The received message as well as the name(s) of the target job chain(s) are stored globally in the class for further processing.

    Code Block
    languagejava
    titleexecute()
    linenumberstrue
    collapsetrue
    private void execute() throws Exception {
        Variable_set params = spooler_task.params();
        params.merge(spooler_task.order().params());
        spooler_log.debug9(params.xml());
        String protocol = params.value("MQ_Protocol");
        spooler_log.debug9("Received protocol: " + protocol);
        if(protocol == null || (protocol != null && protocol.isEmpty())){
            protocol = DEFAULT_PROTOCOL;
        }
        String messageHost = params.value("MQ_Host");
        spooler_log.debug9("Received MQ Host: " + messageHost);
        String messagePort = params.value("MQ_Port");
        spooler_log.debug9("Received MQ port: " + messagePort);
        String queueName = params.value("MQ_QueueName");
        spooler_log.debug9("Received Queue name: " + queueName);
        Boolean lastConsumer = Boolean.valueOf(params.value("lastConsumer"));
        spooler_log.debug9("Received lastConsumer: " + lastConsumer.toString());
        if(queueName == null || (queueName != null && queueName.isEmpty())){
            queueName = DEFAULT_QUEUE_NAME;
        }
        targetJobChains = params.value("targetJobChainName");
        spooler_log.debug9("Received targetJobChains: " + targetJobChains);
        String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
        Connection jmsConnection = createConnection(connectionUrl);
        messageXml = read(jmsConnection, queueName, lastConsumer);
        spooler_log.debug9("Received message: " + messageXml);
    }

    Putting the Bricks Together

    The last thing to do is for the Consumer Job is to call the execute() method as well as the executeXmlForAllTargets(String message) method in spooler_process(), as shown in the complete code example below.

    Code Block
    languagejava
    titleComplete Source of the MessageConsumerExampleJob
    linenumberstrue
    collapsetrue
    package com.sos.jitl.messaging;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import sos.spooler.Job_impl;
    import sos.spooler.Variable_set;
    
    public class MessageConsumerExampleJob extends Job_impl {
    
        private static final String DEFAULT_QUEUE_NAME = "JobChainQueue";
        private static final String DEFAULT_PROTOCOL = "tcp";
        private static final String DELIMITER = ",";
        private String messageXml; 
        private String targetJobChains;
    
        @Override
        public boolean spooler_process() throws Exception {
            try {
                execute();
                executeXmlForAllTargets(messageXml);
            } catch (Exception e) {
                spooler_log.error("Error occured in spooler_process() of MessageConsumerTestJob: ");
                throw e;
            }
            return spooler_job.order_queue() != null;
        }
    
        private void execute() throws Exception {
            Variable_set params = spooler_task.params();
            params.merge(spooler_task.order().params());
            spooler_log.debug9(params.xml());
            String protocol = params.value("MQ_Protocol");
            spooler_log.debug9("Received protocol: " + protocol);
            if(protocol == null || (protocol != null && protocol.isEmpty())){
                protocol = DEFAULT_PROTOCOL;
            }
            String messageHost = params.value("MQ_Host");
            spooler_log.debug9("Received MQ Host: " + messageHost);
            String messagePort = params.value("MQ_Port");
            spooler_log.debug9("Received MQ port: " + messagePort);
            String queueName = params.value("MQ_QueueName");
            spooler_log.debug9("Received Queue name: " + queueName);
            Boolean lastConsumer = Boolean.valueOf(params.value("lastConsumer"));
            spooler_log.debug9("Received lastConsumer: " + lastConsumer.toString());
            if(queueName == null || (queueName != null && queueName.isEmpty())){
                queueName = DEFAULT_QUEUE_NAME;
            }
            targetJobChains = params.value("targetJobChainName");
            spooler_log.debug9("Received targetJobChains: " + targetJobChains);
            String connectionUrl = createConnectionUrl(protocol, messageHost, messagePort);
            Connection jmsConnection = createConnection(connectionUrl);
            messageXml = read(jmsConnection, queueName, lastConsumer);
            spooler_log.debug9("Received message: " + messageXml);
        }
        
        private String createConnectionUrl (String protocol, String hostName, String port){
            StringBuilder strb = new StringBuilder();
            strb.append(protocol).append("://").append(hostName).append(":").append(port);
            return strb.toString();
        }
    
        private Connection createConnection(String uri) throws JMSException{
            ConnectionFactory factory = new ActiveMQConnectionFactory(uri);
            Connection jmsConnection = null;
            try {
                jmsConnection = factory.createConnection();
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to connect: ");
                throw e;
            }
            return jmsConnection;
        }
        
        private Session createSession(Connection connection) throws JMSException{
            Session session = null;
            try {
                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to create Session: ");
                throw e;
            }
            return session;
        }
        
        private Destination createDestination(Session session, String queueName) throws JMSException{
            Destination destination = null;
            try {
                destination = session.createQueue(queueName);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to create Destination: ");
                throw e;
            }
            return destination;
        }
        
        private MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
            MessageConsumer consumer = null;
            try {
                consumer = session.createConsumer(destination);
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to create MessageConsumer: ");
                throw e;
            }
            return consumer;
        }
    
        private String read(Connection connection, String queueName, Boolean closeMessage) throws JMSException {
            String messageText = null;
            try {
                Session session = createSession(connection);
                Destination destination = createDestination(session, queueName);
                connection.start();
                MessageConsumer consumer = createMessageConsumer(session, destination);
                Message receivedMessage = null;
                while (true) {
                    receivedMessage = consumer.receive(1);
                    if (receivedMessage != null) {
                        if (receivedMessage instanceof TextMessage) {
                            TextMessage message = (TextMessage) receivedMessage;
                            messageText = message.getText();
                            break;
                        } else {
                            break;
                        }
                    }
                }
                if(closeMessage){
                    receivedMessage.acknowledge();
                }
            } catch (JMSException e) {
                spooler_log.error("JMSException occurred while trying to read from Destination: ");
                throw e;
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        spooler_log.error("JMSException occurred while trying to close the connection: ");
                    }
                }
             }
            return messageText;
        }
    
        private void executeXml(String message) {
            spooler_log.debug9("execute XML started");
            String answer = spooler.execute_xml(message);
            spooler_log.debug9("Return value of executeXML: " + answer);
            spooler_log.debug9("order sent");
        }
        
        private void executeXmlForAllTargets(String message) {
            if (targetJobChains.contains(DELIMITER) && message.contains("add_order")) {
                String [] jobChainNames = targetJobChains.split("[" + DELIMITER + "]");
                for (String name : jobChainNames){
                    spooler_log.debug9("add_order XML will be adjusted for JobChain: " + name);
                    executeXml(message.replaceFirst("job_chain='[^']*'", "job_chain='" + name + "'"));
                }            
            }else{
                executeXml(message);
            }
        }
    }

    Deploy the Java Classes to the JobScheduler

    To be able to run the new Java jobs simply deploy/copy the created jar library to the user_lib folder in your $SCHEDULER_HOME.

    Configuring the Jobs in JOE

    This section shows the creation of an example job chain to run both jobs.

    Create the Producer Job in JOE

    Create a new Job and write the class name including the package structure in the Classname field. Let´s call the job XML_Producer after the example's use case.

    ...

    Configure the parameters for the job as shown below.

    Create the Consumer Job in JOE

    Create a new Job and write the class name including the package structure in the Classname field. Let´s call the job XML_Consumer after the example's use case. 

    ...

    The targetJobChainName parameter is only needed if you want to change the configuration as shown in the message parameter of the producer job.


    Create the Job Chain with JOE

    This job chain configuration has been written to show the processing of the job - therefore both jobs are put in the same job chain. In a production environment it is more likely that both jobs would reside in different job chains, that there is more than one consumer, etc...

    ...