Table of Contents |
---|
Scope
- Use Case
- A number of orders are added in an arbitrary sequence and at arbitrary points in ime to a job chain - either from a file order source watching incoming files, from Ad Hoc orders or from permanent orders. The orders should be forced into serialized processing based on some predefined criteria.
- Each order comes either
- with a parameter that specifies a numeric sequence,
- with a parameter that specifies a business date, e.g. for the data that are referenced by the order and that should be loaded to a data warehouse in sequence of the business dates,
- or without parameters and should be processed in alphabetical or numeric sequence of its order id.
- Orders should be processed strictly in sequence - either ascending or descending. If the next order does not provide the expected value then it is suspended and the job chain waits for an order with the expected value to arrive. After arrival of an order with the expected value all suspended orders are checked if they provide the expected value required for the successor order.
- Solution Outline
- A single job is added to the top of a job chain and that will
- check for exepected values and suspend incoming orders until an order with the expected value arrives.
- move orders that match expected values to the next job node in the job chain and restart evaluation of suspended orders.
- A single job is added to the top of a job chain and that will
- References
Solution
- Download expect_orders.zip
- Extract the archive to any folder within the
./config/live
folder of your JobScheduler installation. - The archive will extract the files to a folder
expect_orders.
- You can store the sample files to a any folder as you like, the solution does not make use of specific folder names or job names.
Pattern
Flowchart |
---|
job_chain [label="Job Chain\ntriggered by File Orders\nor by Ad Hoc Orders",fillcolor="orange"] job_sorter [label="Job Order Sorter",fillcolor="lightskyblue"] job_next_job [label="Next Job", fillcolor="lightskyblue"] sorter_orders_completed [shape=diamond,label="list of orders completed?",fillcolor="white"] order_suspend [label="Suspend Order",fillcolor="white"] order_wait [label="Wait for next Order",fillcolor="white"] order_sort [label="Sort Orders",fillcolor="white"] order_move [label="Move Orders to Next Job",fillcolor="white"] order_C [shape="ellipse",label="Order C",fillcolor="violet"] order_B [shape="ellipse",label="Order B",fillcolor="violet"] order_A [shape="ellipse",label="Order A",fillcolor="violet"] sorted_order_A [shape="ellipse",label="Order A",fillcolor="violet"] sorted_order_B [shape="ellipse",label="Order B",fillcolor="violet"] sorted_order_C [shape="ellipse",label="Order C",fillcolor="violet"] order_A -> job_chain order_B -> job_chain order_C -> job_chain job_chain -> job_sorter job_sorter -> sorter_orders_completed sorter_orders_completed -> order_sort [label=" yes "] sorter_orders_completed -> order_suspend [label=" no "] order_suspend -> order_wait -> job_sorter order_sort -> order_move -> sorted_order_C sorted_order_C -> sorted_order_B sorted_order_B -> sorted_order_A sorted_order_A -> job_next_job |
Implementation
Components
- The solution implements a job
expect
that can be added to the top of any job chain.- This job implements a
spooler_process()
function that checks if an order matches the expected value and that suspends non-matching orders. - This job is configured for a single task, i.e. it executes incoming orders sequentually.
- Having received an order with the expected value this job moves that order to the next job node in the job chain and activates any suspended orders to be re-evaluated to provide the next expected value.
- The idle timeout is configured by
<job idle_timeout="10">
with thesorter
job definition. - With the idle timeout being expired this job will execute its
spooler_exit()
function and will sort and move all orders that have previously been suspended.- Sorting is done in alphabetical order.
- The orders are moved to the next job chain node that follows the
sorter
job in the job chain.
- The idle timeout is configured by
- This job implements a
- The sample makes use of a job chain
job_chain_exoected_orders
that includes the job nodes for theexpect
job and ahello
job. The job chain accepts Ad Hoc orders that are added by use of JOC and the job chain can easily be modified to watch for incoming files and to create one order for each file.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
var jobChainPath = null; var jobChainNodeState = null; var orderExpectedValue = null; var orderExpectedDefaultValue = null; var orderExpectedValueFunction = "parseInt(currentValue) + 1"; var controlOrderPrefix = "control_order_"; var controlOrderID = null; function executeXML( command ) { var rc = false; spooler_log.debug( ".... executing xml command: " + command ); var response = spooler.execute_xml( command ); var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) ); var errorCode = xmlDOM.selectSingleNodeValue( "//ERROR/@code" ); var errorText = xmlDOM.selectSingleNodeValue( "//ERROR/@text" ); if ( errorCode || errorText ) { spooler_log.error( ".... xml response: errorCode=" + errorCode + ", errorText=" + errorText ); } else { rc = xmlDOM; } return rc; } function getOrderExpectedValue() { if (orderExpectedValue == null) { orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" ); if ( orderDOM ) { orderExpectedValue = orderDOM.selectSingleNodeValue( "/spooler/answer/order/payload/params/param[@name = '" + controlOrderID + "']/@value" ); } } if ( !orderExpectedValue ) { orderExpectedValue = orderExpectedDefaultValue; spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue + " (default)" ); } else { spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue ); } return orderExpectedValue; } function setOrderExpectedValue( expectedValue ) { spooler_log.info( ".... setOrderExpectedValue(): " + expectedValue ); var rc = true; orderExpectedValue = expectedValue; // check existence of control order, if historic then its state does not match the job node state var orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" ); if ( orderDOM ) { var orderState = orderDOM.selectSingleNodeValue( "/spooler/answer/order/@state" ); if (!orderState || orderState != jobChainNodeState) { rc = false; } } else { rc = false; } // add or modify control order if (!rc) { orderDOM = executeXML( "<add_order job_chain='" + jobChainPath + "' id='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></add_order>" ); } else { orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></modify_order>" ); } spooler_job.state_text = "next expected value: " + orderExpectedValue; } function calculateOrderExpectedValue( currentValue ) { // calculate next date for a date parameter // var expectedValue = (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10); // calculate increment for a numeric parameter or order id // var expectedValue = parseInt(currentValue) + 1; var expectedValue = eval( orderExpectedValueFunction ); return expectedValue; } function spooler_init() { jobChainPath = spooler_task.order.job_chain.path; jobChainNodeState = spooler_task.order.job_chain_node.state; controlOrderID = controlOrderPrefix + spooler_task.order.job_chain_node.state; return true; } function spooler_process() { var rc = true; // control order is always suspended if (spooler_task.order.id == controlOrderID) { spooler_log.info( ".. control order identified, processing suspended" ); suspendOrder(); return rc; } // merge parameters from task and order var params = spooler.create_variable_set(); params.merge( spooler_task.params ); params.merge( spooler_task.order.params ); // get expected value from a parameter name or from the Order ID spooler_log.info( ".. control parameter for expected value is looked up: " + controlOrderPrefix + "expected_parameter" ); var expectedParameter = params.value( controlOrderPrefix + "expected_parameter" ); if (expectedParameter) { var currentValue = params.value( expectedParameter ); spooler_log.info( ".. current value is used from control parameter [" + expectedParameter + "]: " + currentValue ); } else { var currentValue = spooler_task.order.id; spooler_log.info( ".. current value is used from order id: " + currentValue ); } // get expected default value from a parameter or from the order id if (!orderExpectedDefaultValue) { orderExpectedDefaultValue = params.value( controlOrderPrefix + "expected_default_value" ); if (orderExpectedDefaultValue) { spooler_log.info( ".. default value is used from control parameter [" + controlOrderPrefix + "expected_default_value]: " + orderExpectedDefaultValue ); } else { spooler_log.info( ".. default value is used from order id: " + spooler_task.order.id ); orderExpectedDefaultValue = spooler_task.order.id; } } // get expected value calculation function var expectedValueFunction = params.value( controlOrderPrefix + "expected_value_function" ); if (expectedValueFunction) { orderExpectedValueFunction = expectedValueFunction; spooler_log.info( ".. using expected value function: " + orderExpectedValueFunction ); } else { spooler_log.info( ".. using expected value default function: " + orderExpectedValueFunction ); } // check if order value matches expectation if ( getOrderExpectedValue() == currentValue ) { // after processing of the current order all suspended orders are activated spooler_log.info( ".. current order provides expected value: " + getOrderExpectedValue() ); activateSuspendedOrders(); setOrderExpectedValue( calculateOrderExpectedValue( getOrderExpectedValue() ) ); } else { // suspend non-matching order spooler_log.info( ".. suspending current order: expected value=" + getOrderExpectedValue() + ", current value=" + currentValue ); suspendOrder(); } return rc; } function suspendOrder() { spooler_task.order.suspended = true; spooler_task.order.state = jobChainNodeState; } function activateSuspendedOrders() { var rc = true; var orderList = Array(); // select suspended orders of the current job node var orderDOM = executeXML( "<show_job_chain job_chain='" + jobChainPath + "' what='job_chain_orders'/>" ); var orderNodes = orderDOM.selectNodeList( "/spooler/answer/job_chain/job_chain_node[@state = '" + jobChainNodeState + "']/order_queue/order[@suspended = 'yes']" ); // traverse order list and add orders to sort array for( orderIndex=0; orderIndex<orderNodes.getLength(); orderIndex++ ) { var orderNode = orderNodes.item(orderIndex); var orderID = orderDOM.selectSingleNodeValue( orderNode, "@id" ); if (orderID == null || controlOrderID) { continue; } spooler_log.info( ".... suspended order found: " + orderID ); orderList.push( orderID ); } // alphabetical string sort orderList.sort(function(a, b){return (a > b) - (a < b) }); // numeric sort // orderList.sort(function(a, b){return b - a) }); for(i=0; i<orderList.length; i++) { spooler_log.info( ".... activating order: " + orderList[i] ); orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + orderList[i] + "' state='" + jobChainNodeState + "' suspended='no'/>" ); if ( !orderDOM ) { rc = false; } } return rc; } |
Usage
- Add two orders to the
job_chain_expect_orders
job chain.- Use an order id in descending alphabetical order, e.g. "cba" for the order id of the first order and "abc" for the order id of the second order.
- Both orders will be suspended in the first node of the job chain.
- After an idle timeout of 10s both orders are moved to the next job node in the job chain.
- This time the orders are processed in ascending alphabetical order
...