Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

StatuscolourYellowtitlepage under construction 

Table of Contents

Introduction

This article describes how to implement Java jobs to that communicate over a Message Queue (MQ). This includes one Java These jobs include a job for publishing and one for subscribing and receivingreceiving and executing JobScheduler API commands.

The document explains which classes and methods have to be created and for which purposetheir purposes. This example was developed with the use of Apache MQ.

...

The example covers the specifics required to achieve the use case described below. It neither covers the complete job api API nor the complete possibilities of JMS.

For the coverage of the JobScheduler job api API refer to the XML part of the API Documentation.

...

  • How to implement a Java job using the job apiAPI
  • How to implement the a Producer Job
  • How to implement the a Consumer Job
  • How to execute an XML fragment using the job apiAPI
  • How to deploy the jobs to the JobScheduler
  • How to configure the JobScheduler Jobs with JOE using the developed classes

Download

Prerequisites

The classes described in this example can be downloaded here.

The job configuration (made with JOE) can be downloaded here.

Prerequisites

The following dependency is needed to To write a Java job for the JobScheduler the following dependency is needed.

  • engine-job-api.jar
    • The library is hosted on Maven Central. Please download the jar and add it to the classpath of the Java project. 
    • If the Java project already is a maven Maven project, simply add the following dependency to the project configuration.

...

    • Make sure to use the correct version suitable for the JobScheduler version in use.

For this example the activemq-all-5.13.0.jar library is used.

  • Either download the jar file and add it to the classpath
  • or in case of a maven Maven project add the following dependency to the project configuration.

...

If the Java job is an order job, it has to return true in case no error occurs for the order to be able to continue with the next task if the job ends without an error.

If the Java job  is is a standalone job it has to return false for the JobScheduler to be able to recognize that the task has finished.

This can be achieved by using the return value spooler_job.order_queue() != null. It determines if an order is in the queue of the job (true) or not (false).

...

Because the Java Jobs run API methods, we can directly use the logging feature of the JobScheduler. As shown in the detailed method examples below, logging is done by using the spooler_log method.

Make sure to throw the catched caught exception in order to hand it over to the spooler_process() method. This is needed for a job running in a job chain to determine that it has to fail in case if an error occurs.

Initialization of the MQ Connection

Both Java jobs need some methods to initialize a connection to an MQ Server. This section of the document shows how to implement them. The implementation is based on the JMS implementation used and shipped by ActiveMQ.

To keep it simple for this example simple these methods are put in both Java job classes. To prevent duplicates in a real project´s source code, it is better to put these methods in one a single class and extend the class in the two job classes.

...

In summary the acknowledge mode AUTO_ACKNOWLEDGE mode results in the message be dequeued when one consumer has read the message from the MQ server, whereas the CLIENT_ACKNOWLEDGE puts the responsibility on the client.

When the acknowledge mode CLIENTCLIENT_ACKNOWLEDGE mode is set, the message will stay present for all consumers to read until a consumer acknowledges the message. Only then the message will be dequeued and is not be available for further consumers.

...

This is a helper method which creates a connection URI based on the job parameters. It simply uses a StringBuilder to create a string in the format [protocol]://[hostname]:[port].

...

There are three more methods needed for the producer Java job to run.:

  • createMessageProducer(Session session, Destination destination)
  • write(String text, String connectionUrl, String queueName)
  • execute()

...

 This method is called with an already active Session object as well as an instantiated Destination object. It instantiates a MessageProducer object MessageProducer object with the given session and destination.

...

The method is called with the message to send to be sent to the server, the connection URL to the server and the MessageProducer object to use the queue name to be used for publishing. The message is a String object. The method instantiates a Message object with the text to send. This method makes use of the methods described above. It instantiates all objects needed for the connection and sends a Message object. Also the methods throws an error if one occurs. Although this method may throw an error, it still calls the MessageProducer´s send(Message message)  method in a try..catch..finally block. That is because the use of the  finally block is used to close the connection in the end.

...

This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method  and subsequently calls the write(..) method to send the message to the MQ server.

...

The method is called with an already instantiated Connection object, the name of the queue to read from and a flag to determine if this is the last consumer. If it is the last consumer for the messages in the specified queue then the reiceived received message will be acknowledged. This method makes use of the methods described above. It instantiates all objects needed for the connection and reads a Message object from the given queue. It extracts the value from the Message object as a string representation via the Message object´s getText() method. We have to make sure that connection.start() is called before the MessageConsumer object is instantiated.

...

This method calls the executeXml(String message) method mentioned above for each given target job chain. It replaces the jobChain job_chain argument of the (XML) message with the given job chain names form from the job parameter.

Code Block
languagejava
titleexecuteXmlForAllTargets(String message)
linenumberstrue
collapsetrue
private void executeXmlForAllTargets(String message) {
    if (targetJobChains.contains(DELIMITER) && message.contains("add_order")) {
        String [] jobChainNames = targetJobChains.split("[" + DELIMITER + "]");
        for (String name : jobChainNames){
            spooler_log.debug9("add_order XML will be adjusted for JobChain: " + name);
            executeXml(message.replaceFirst("job_chain='[^']*'", "job_chain='" + name + "'"));
        }            
    }else{
        executeXml(message);
    }
}

...

This is the core method of the Producer Java job. It reads the parameters from the task and order, does some logging at debug level, calls the createConnectionUrl(..) method and subsequently calls the read(..) method. The received message as well as the name(s) of the target job chain(s) are store stored globally in the class for further processing.

...

The last thing to do is for the Consumer Job is to call the execute() method as well as the executeXmlForAllTargets(String message) method in spooler_process() , as shown in the complete code example below.

...

To be able to run the new Java jobs simply deploy(/copy ) the created jar library to the user_lib folder in your $SCHEDULER_HOME.

...

Create a new Job and write the class name including the package structure in the Classname field Classname. Because of the use case of the example let´s . Let´s call the job XML_Producer after the example's use case. 

Configure the parameters for the job as shown below.

...

Create a new Job and write the class name including the package structure in the field Classname. Because of the use case of the example let´s field. Let´s call the job job XML_Consumer after the example's use case

 

Configure the parameters for the job as shown below.

The targetJobChainName parameter targetJobChainName is only needed if you want to change the configuration as shown in the message parameter of the producer job.

 

Create the

...

Job Chain with JOE

This jobchain configuration is only job chain configuration has been written to show the processing of the job , - therefore both jobs are put in the same jobchainjob chain. In a production environment it is more likely that both jobs would reside in different jobchainsjob chains, that there are is more than one consumersconsumer, etc...

Create a Jobchain job chain which looks like the example below. 

...