Table of Contents |
---|
Scope
- Use Case
- A Consider the situation where a number of orders are added in an arbitrary sequence and at arbitrary points in ime time to a job chain - either from a file order source watching incoming files , from Ad Hoc or from an external source such as JOC that adds ad hoc orders or from permanent orders. The orders should be forced into serialized processing based on some then be processed serially according to predefined criteria.
- Each order comes either:
- either with a parameter that specifies a numeric sequence,
- or with a parameter that specifies a business date, e.g. for data files that are referenced by the order and that should be loaded to a data warehouse in sequence of the business dates,
- or without parameters and should be processed in alphabetical or numeric numerical sequence of according to its order idID.
- Orders should be processed strictly in the required sequence - either ascending or descending. If the next an incoming order does not provide the expected value then it is should be suspended and the job chain waits for an order with the expected value to arrive. After arrival of an order with the expected value all suspended orders are should then be checked to see if they provide the expected value required for the successor order.
- Solution Outline
- A single job is added to at the top beginning of a job chain and that will. This job:
- checks
- check for exepected values and suspend suspends incoming orders until an order with the expected value arrives.,
- move orders moves orders that match expected values to the next job node in the job chain and recheck the rechecks the expected value of any suspended orders.
- A single job is added to at the top beginning of a job chain and that will. This job:
- References
...
- Download expect_orders.zip
- Extract the archive to any folder within the
./config/live
folder of your JobScheduler installation. - The archive will extract the files to a folder
expect_orders.
- You can store the sample files to a in any folder as you like , - the solution does not make use of specific folder names or job names.
...
- The solution implements a job
expect
that can be added to at the top start of any a job chain.- This job implements a
spooler_process()
function that checks if an order matches the expected value and that suspends non-matching orders.- Expected values can be provided by incoming orders:
- by use of using parameters:
- each incoming order is assumed to provide a parameter that contains the expected value.
- Example for use with oder parameter: business
business_date = 2015-11-01
- Example for use with oder parameter: business
- the name of the incoming orders' parameter is specified as the value of the
expect
job'scontrol_order_expected_parameter
parameter to theexpect
job.- Example for use with
expect
job parameter:control_order_expected_parameter
= business_date
- Example for use with
- each incoming order is assumed to provide a parameter that contains the expected value.
- without parameters:
- the job will expect a numeric order id ID to be provided and increments the order id ID to calculcate calculate the next expected value.
- by use of using parameters:
- For expected values a function has to be provided that calculates the next expected value:
- the function is specified by use of the conctrolby the
expect
job'scontrol_order_expected_value_function
parameter to theexpect
job parameter. The function is has been implemented with some JavaScript code that returns the next expected value based on according to the current value that is provided with thecurrentValue
variable:- Example for numeric calculation:
parseInt(currentValue) + 1
- Explanation: the function parses the current expected numeric value and returns the incremented value.
- Example for date calculcationcalculation:
(new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10);
- Explanation: the function accepts the current date value, adds one day and returns the date in ISO format, e.g.
2015-11-01
- Example for numeric calculation:
- the function is specified by use of the conctrolby the
- For expected values a A default value can be provided for the expected values that is used to check the expected value of the first incoming order:
- the default value can be specified by use of the by the
expect
job'scontrol_order_expected_default_value
parameter to theexpect
job.- Example for use with numeric default value:
control_order_expected_default_value = 1
- Example for use with date default value:
control_order_expected_default_value = 2015-11-01
- Example for use with numeric default value:
- without specification of a default value the value of the first order processed by the
expect
job will be used as a default.
- the default value can be specified by use of the by the
- Expected values can be provided by incoming orders:
- This job is configured for a single task, i.e. it executes incoming orders sequentuallysequentially.
- Having received an order with the expected value this job moves that order to the next job node in the job chain and activates any suspended orders for rechecking to be rechecked see if they provide the next expected value.
- This job implements a
- The solution automatically creates a control order that is used to store the next expected value.
- The name of the control order is created generated from the prefix "control_order_" prefix and the state that the
expect
job node is assigned. - The control order is suspended and not processed by subsequent job nodes. Its only purpose is to carry the next expected value for use of the solution with JobScheduler Agents and cluster members.
- The name of the control order is created generated from the prefix "control_order_" prefix and the state that the
- The sample makes use of a job chain
job_chain_expected_orders
that This chain includes the job nodes for theexpect
job and ahello
job. The job chain accepts Ad Hoc ad hoc orders that are added by use of using JOC and the job chain can easily be modified to watch for incoming files and to create one an order for each file. - Hint: to re-use the
expect
job you can:- store the job to some in a central folder and reference the job in individual job chains.
- move the job's JavaScript code of the job to come a central location and use a corresponding an appropriate
<include>
element for the individual job scriptscripts.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
var jobChainPath = null; var jobChainNodeState = null; var orderExpectedValue = null; var orderExpectedDefaultValue = null; var orderExpectedValueFunction = "parseInt(currentValue) + 1"; var controlOrderPrefix = "control_order_"; var controlOrderID = null; function executeXML( command ) { var rc = false; spooler_log.debug( ".... executing xml command: " + command ); var response = spooler.execute_xml( command ); var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) ); var errorCode = xmlDOM.selectSingleNodeValue( "//ERROR/@code" ); var errorText = xmlDOM.selectSingleNodeValue( "//ERROR/@text" ); if ( errorCode || errorText ) { spooler_log.error( ".... xml response: errorCode=" + errorCode + ", errorText=" + errorText ); } else { rc = xmlDOM; } return rc; } function getOrderExpectedValue() { if (orderExpectedValue == null) { orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" ); if ( orderDOM ) { orderExpectedValue = orderDOM.selectSingleNodeValue( "/spooler/answer/order/payload/params/param[@name = '" + controlOrderID + "']/@value" ); } } if ( !orderExpectedValue ) { orderExpectedValue = orderExpectedDefaultValue; spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue + " (default)" ); } else { spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue ); } return orderExpectedValue; } function setOrderExpectedValue( expectedValue ) { spooler_log.info( ".... setOrderExpectedValue(): " + expectedValue ); var rc = true; orderExpectedValue = expectedValue; // check existence of control order, if historic then its state does not match the job node state var orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" ); if ( orderDOM ) { var orderState = orderDOM.selectSingleNodeValue( "/spooler/answer/order/@state" ); if (!orderState || orderState != jobChainNodeState) { rc = false; } } else { rc = false; } // add or modify control order if (!rc) { orderDOM = executeXML( "<add_order job_chain='" + jobChainPath + "' id='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></add_order>" ); } else { orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></modify_order>" ); } spooler_job.state_text = "next expected value: " + orderExpectedValue; } function calculateOrderExpectedValue( currentValue ) { // calculate next date for a date parameter // var expectedValue = (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10); // calculate increment for a numeric parameter or order id // var expectedValue = parseInt(currentValue) + 1; var expectedValue = eval( orderExpectedValueFunction ); return expectedValue; } function spooler_init() { jobChainPath = spooler_task.order.job_chain.path; jobChainNodeState = spooler_task.order.job_chain_node.state; controlOrderID = controlOrderPrefix + spooler_task.order.job_chain_node.state; return true; } function spooler_process() { var rc = true; // control order is always suspended if (spooler_task.order.id == controlOrderID) { spooler_log.info( ".. control order identified, processing suspended" ); suspendOrder(); return rc; } // merge parameters from task and order var params = spooler.create_variable_set(); params.merge( spooler_task.params ); params.merge( spooler_task.order.params ); // get expected value from a parameter name or from the Order ID spooler_log.info( ".. control parameter for expected value is looked up: " + controlOrderPrefix + "expected_parameter" ); var expectedParameter = params.value( controlOrderPrefix + "expected_parameter" ); if (expectedParameter) { var currentValue = params.value( expectedParameter ); spooler_log.info( ".. current value is used from control parameter [" + expectedParameter + "]: " + currentValue ); } else { var currentValue = spooler_task.order.id; spooler_log.info( ".. current value is used from order id: " + currentValue ); } // get expected default value from a parameter or from the order id if (!orderExpectedDefaultValue) { orderExpectedDefaultValue = params.value( controlOrderPrefix + "expected_default_value" ); if (orderExpectedDefaultValue) { spooler_log.info( ".. default value is used from control parameter [" + controlOrderPrefix + "expected_default_value]: " + orderExpectedDefaultValue ); } else { spooler_log.info( ".. default value is used from order id: " + spooler_task.order.id ); orderExpectedDefaultValue = spooler_task.order.id; } } // get expected value calculation function var expectedValueFunction = params.value( controlOrderPrefix + "expected_value_function" ); if (expectedValueFunction) { orderExpectedValueFunction = expectedValueFunction; spooler_log.info( ".. using expected value function: " + orderExpectedValueFunction ); } else { spooler_log.info( ".. using expected value default function: " + orderExpectedValueFunction ); } // check if order value matches expectation if ( getOrderExpectedValue() == currentValue ) { // after processing of the current order all suspended orders are activated spooler_log.info( ".. current order provides expected value: " + getOrderExpectedValue() ); activateSuspendedOrders(); setOrderExpectedValue( calculateOrderExpectedValue( getOrderExpectedValue() ) ); } else { // suspend non-matching order spooler_log.info( ".. suspending current order: expected value=" + getOrderExpectedValue() + ", current value=" + currentValue ); suspendOrder(); } return rc; } function suspendOrder() { spooler_task.order.suspended = true; spooler_task.order.state = jobChainNodeState; } function activateSuspendedOrders() { var rc = true; var orderList = Array(); // select suspended orders of the current job node var orderDOM = executeXML( "<show_job_chain job_chain='" + jobChainPath + "' what='job_chain_orders'/>" ); var orderNodes = orderDOM.selectNodeList( "/spooler/answer/job_chain/job_chain_node[@state = '" + jobChainNodeState + "']/order_queue/order[@suspended = 'yes']" ); // traverse order list and add orders to sort array for( orderIndex=0; orderIndex<orderNodes.getLength(); orderIndex++ ) { var orderNode = orderNodes.item(orderIndex); var orderID = orderDOM.selectSingleNodeValue( orderNode, "@id" ); if (orderID == null || controlOrderID) { continue; } spooler_log.info( ".... suspended order found: " + orderID ); orderList.push( orderID ); } // alphabetical string sort orderList.sort(function(a, b){return (a > b) - (a < b) }); // numeric sort // orderList.sort(function(a, b){return b - a) }); for(i=0; i<orderList.length; i++) { spooler_log.info( ".... activating order: " + orderList[i] ); orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + orderList[i] + "' state='" + jobChainNodeState + "' suspended='no'/>" ); if ( !orderDOM ) { rc = false; } } return rc; } |
Usage
- Add thre the orders to the
job_chain_expect_orders
job chain. For your convenience the orders 1, 2 and 3 are provided with the sample.- Each order uses a
sequence
parametersequence
with a sequential number such as 1, 2, 3. - The
expect
job uses the parameters parameters:control_order_expected_parameter
with the value:sequence
control_order_expected_default_value
with the value:1
control_order_expected_value_function
with the value:parseInt(currentValue) + 1
- Each order uses a
- Start order 1 that matches the expected value (parameter
sequence=1
). :- The order is will be processed and then moved to the
next_job
job. - A control order named
control_order_expect
is will be dynamically created generated and is suspended that . This order contains the next expected value (parametercontrol_order_expect=2
)
- The order is will be processed and then moved to the
- Start order 3 that does not match the expected value (parameter
sequence=3
).- The order is will be suspended.
- The control order with the expected value (parameter
control_order_expect=2
) remains will remain unchanged.
- Start order 2 that does match matches the expected value (parameter
sequence=2
) .:- Order 2 is will be processed and moved to the
next_job
job.- The control order is will be automatically modified to carry the next expected value (parameter
control_order_expect=3
).
- The control order is will be automatically modified to carry the next expected value (parameter
- Order 3 is 3 will be processed and then moved to the
next_job
job.- The control order is order will be automatically modified to carry the next expected value (parameter
control_order_expect=4
).
- The control order is order will be automatically modified to carry the next expected value (parameter
- Order 2 is will be processed and moved to the
- For use with a date sequence instead of a numeric sequence sequence:
- the orders 1, 2, 3 carry a second parameter
business_date
with values in an ISO date format. - the
expect
job carries sample parameters (prefixed withsample_
) that can be activated (renamed) to work with thebusiness_date
parameter instead of thesequence
parameter of the incoming orders .
- the orders 1, 2, 3 carry a second parameter
...