Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Scope

  • Use Case
    • A Consider the situation where a number of orders are added in an arbitrary sequence and at arbitrary points in ime time to a job chain - either from a file order source watching incoming files , from Ad Hoc or from an external source such as JOC that adds ad hoc orders or from permanent orders. The orders should be forced into serialized processing based on some then be processed serially according to predefined criteria.
    • Each order comes either:
      • with a parameter that specifies a numeric sequence,
      • with a parameter that specifies a business date, e.g. for the data files that are referenced by the order and that should be loaded to a data warehouse in sequence of the business dates,
      • or without parameters and should be processed in alphabetical or numeric numerical sequence of according to its order idID.
    • Orders should be processed strictly in the required sequence - either ascending or descending. If the next an incoming order does not provide the expected value then it is should be suspended and the job chain waits for an order with the expected value to arrive. After arrival of an order with the expected value all suspended orders are should then be checked to see if they provide the expected value required for the successor order. 
  • Solution Outline
    • A single job is added to at the top beginning of a job chain and that will. This job:
      • checks
      • check for exepected values and suspend suspends incoming orders until an order with the expected value arrives.,
      • move orders moves orders that match expected values to the next job node in the job chain and restart evaluation of rechecks the expected value of any suspended orders.
  • References

Solution

  • Download expect_orders.zip
  • Extract the archive to any folder within the ./config/live folder of your JobScheduler installation.
  • The archive will extract the files to a folder expect_orders.
  • You can store the sample files to a in any folder as you like , - the solution does not make use of specific folder names or job names.

Pattern

Flowchart
job_chain [label="Job Chain\ntriggered by File Orders\nor by Ad Hoc Orders\nin arbitrary sequence",fillcolor="orange"]
job_sorterexpect [label="Job Order SorterExpect",fillcolor="lightskyblue"]
job_next_job [label="Next Job", fillcolor="lightskyblue"]

sorterexpect_ordersorder_completedvalue [shape=diamond,label="listorder ofprovides ordersexpected completed?value?\nbusiness_date=2015-11-01",fillcolor="white"]
order_suspend [label="Suspend Order",fillcolor="white"]
order_wait [label="Wait for next Order",fillcolor="white"]
order_sortmove [label="Sort Orders",Move Order to Next Job",fillcolor="white"]
order_movecalculate [label="Move Orders to Next JobCalculate next expected value\nbusiness_date=2015-11-02",fillcolor="white"]
order_recheck [label="Recheck suspended Orders",fillcolor="white"]

order_C [shape="ellipse",label="Order C\nbusiness_date=2015-11-03",fillcolor="violet"]
order_B [shape="ellipse",label="Order B\nbusiness_date=2015-11-02",fillcolor="violet"]
order_A [shape="ellipse",label="Order A\nbusiness_date=2015-11-01",fillcolor="violet"]

sortedsequenced_order_A [shape="ellipse",label="Order A\nbusiness_date=2015-11-01",fillcolor="violet"] 
sorted# sequenced_order_B [shape="ellipse",label="Order B",fillcolor="violet"]
sorted# sequenced_order_C [shape="ellipse",label="Order C",fillcolor="violet"]
  
order_A -> job_chain
order_B -> job_chain
order_C -> job_chain
job_chain -> job_sorterexpect
job_sorterexpect -> sorterexpect_ordersorder_completedvalue
sorterexpect_ordersorder_completedvalue -> order_sortcalculate [label=" yes "]
sorterexpect_ordersorder_completedvalue -> order_suspend [label=" no "]

order_suspendcalculate -> order_waitrecheck
order_recheck -> job_sorterexpect
order_sortrecheck -> order_move
order_suspend -> order_wait -> sortedjob_expect

order_move -> sequenced_order_CA
sorted# sequenced_order_C -> sortedsequenced_order_B
sorted# sequenced_order_B -> sortedsequenced_order_A
sortedsequenced_order_A -> job_next_job

Implementation

Components

  • The solution implements a job expect that can be added to at the top start of any a job chain.
    • This job implements a spooler_process() function that checks if an order matches the expected value and that suspends non-matching orders.
      • This job is configured for a single task, i.e. it executes incoming orders sequentually.
      • Having received an order with the expected value this job moves that order to the next job node in the job chain and activates any suspended orders to be re-evaluated to provide the next expected value.
        • The idle timeout is configured by <job idle_timeout="10"> with the sorter job definition.
        • With the idle timeout being expired this job will execute its spooler_exit() function and will sort and move all orders that have previously been suspended.
          • Sorting is done in alphabetical order.
          • The orders are moved to the next job chain node that follows the sorter job in the job chain.
    • The sample makes use of a job chain job_chain_exoected_orders that includes the job nodes for the expect job and a hello job. The job chain accepts Ad Hoc orders that are added by use of JOC and the job chain can easily be modified to watch for incoming files and to create one order for each file.

     

        • Expected values can be provided by incoming orders:
          • using parameters:
            • each incoming order is assumed to provide a parameter that contains the expected value. 
              • Example for use with oder parameter:
                • business_date = 2015-11-01
            • the name of the incoming orders' parameter is specified as the value of the expect job's control_order_expected_parameter parameter.
              • Example for use with expect job parameter: 
                • control_order_expected_parameter= business_date
          • without parameters: 
            • the job will expect a numeric order ID to be provided and increments the order ID to calculate the next expected value.
        • For expected values a function has to be provided that calculates the next expected value:
          • the function is specified by the expect job's control_order_expected_value_function parameter. The function has been implemented with JavaScript code that returns the next expected value according to the current value that is provided with the currentValue variable:
            • Example for numeric calculation: 
              • parseInt(currentValue) + 1
              • Explanation: the function parses the current expected numeric value and returns the incremented value.
            • Example for date calculation:
              • (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10);
              • Explanation: the function accepts the current date value, adds one day and returns the date in ISO format, e.g. 2015-11-01
        • A default value can be provided for the expected values that is used to check the expected value of the first incoming order:
          • the default value can be specified by the expect job's control_order_expected_default_value parameter.
            • Example for use with numeric default value: 
              • control_order_expected_default_value = 1
            • Example for use with date default value: 
              • control_order_expected_default_value = 2015-11-01
          • without specification of a default value the value of the first order processed by the expect job will be used as a default.
      • This job is configured for a single task, i.e. it executes incoming orders sequentially.
      • Having received an order with the expected value this job moves that order to the next job node in the job chain and activates any suspended orders for rechecking to see if they provide the next expected value.
    • The solution automatically creates a control order that is used to store the next expected value.
      • The name of the control order is generated from the "control_order_" prefix and the state that the expect job node is assigned.
      • The control order is suspended and not processed by subsequent job nodes. Its only purpose is to carry the next expected value for use of the solution with JobScheduler Agents and cluster members.
    • The sample makes use of a job chain job_chain_expected_orders This chain includes the job nodes for the expect job and a hello job. The job chain accepts ad hoc orders that are added using JOC and can easily be modified to watch for incoming files and create an order for each file.
    • Hint: to re-use the expect job you can:
      • store the job in a central folder and reference the job in individual job chains.
      • move the job's JavaScript code to a central location and use an appropriate <include> element for individual job scripts.

     

    Code Block
    languagejs
    titleJob expect
    collapsetrue
    var jobChainPath = null;
    var jobChainNodeState = null;
    
    var orderExpectedValue = null;
    var orderExpectedDefaultValue = null;
    
    var orderExpectedValueFunction = "parseInt(currentValue) + 1";
    var controlOrderPrefix = "control_order_";
    var controlOrderID = null;
    
    function executeXML( command ) {
      var rc = false;
    
      spooler_log.debug( ".... executing xml command: " + command );
      var response = spooler.execute_xml( command );
      var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) );
      var errorCode = xmlDOM.selectSingleNodeValue( "//ERROR/@code" );
      var errorText = xmlDOM.selectSingleNodeValue( "//ERROR/@text" );
      if ( errorCode || errorText ) {
        spooler_log.error( ".... xml response: errorCode=" + errorCode + ", errorText=" + errorText );
      } else {
        rc = xmlDOM;
      }
    
      return rc;
    }
    
    function getOrderExpectedValue() {
      if (orderExpectedValue == null) {
        orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" );
        if ( orderDOM ) {
          orderExpectedValue = orderDOM.selectSingleNodeValue( "/spooler/answer/order/payload/params/param[@name = '" + controlOrderID + "']/@value" );
        }
      }
    
      if ( !orderExpectedValue ) {
        orderExpectedValue = orderExpectedDefaultValue;
        spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue + " (default)" );
      } else {
        spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue );
      }
    
      return orderExpectedValue;
    }
    
    function setOrderExpectedValue( expectedValue ) {
      spooler_log.info( ".... setOrderExpectedValue(): " + expectedValue );
      var rc = true;
      orderExpectedValue = expectedValue;
    
      // check existence of control order, if historic then its state does not match the job node state
      var 
    Code Block
    languagejs
    titleJob expect
    collapsetrue
    var jobChainPath = null;
    var jobChainNodeState = null;
    
    var orderExpectedValue = null;
    var orderExpectedDefaultValue = null;
    
    var orderExpectedValueFunction = "parseInt(currentValue) + 1";
    var controlOrderPrefix = "control_order_";
    var controlOrderID = null;
    
    function executeXML( command ) {
      var rc = false;
    
      spooler_log.debug( ".... executing xml command: " + command );
      var response = spooler.execute_xml( command );
      var xmlDOM = new Packages.sos.xml.SOSXMLXPath( new java.lang.StringBuffer( response ) );
      var errorCode = xmlDOM.selectSingleNodeValue( "//ERROR/@code" );
      var errorText = xmlDOM.selectSingleNodeValue( "//ERROR/@text" );
      if ( errorCode || errorText ) {
        spooler_log.error( ".... xml response: errorCode=" + errorCode + ", errorText=" + errorText );
      } else {
        rc = xmlDOM;
      }
    
      return rc;
    }
    
    function getOrderExpectedValue() {
      if (orderExpectedValue == null) {
        orderDOM = executeXML( "<show_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' what='payload'/>" );
        if ( orderDOM ) {
        var  orderExpectedValueorderState = orderDOM.selectSingleNodeValue( "/spooler/answer/order/payload/params/param[@name = '" + controlOrderID + "']/@value"@state" );
        }
      }
    
    
        if (!orderState || orderState !orderExpectedValue= jobChainNodeState) {
        orderExpectedValue  rc = orderExpectedDefaultValuefalse;
        spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue + " (default)" );}
      } else {
        spooler_log.info( ".... getOrderExpectedValue(): " + orderExpectedValue )
        rc = false;
      }
    
       return orderExpectedValue;
    }
    
    function setOrderExpectedValue( expectedValue // add or modify control order
      if (!rc) {
      spooler_log.info  orderDOM = executeXML( ".... setOrderExpectedValue(): <add_order job_chain='" + expectedValuejobChainPath );
    +  var rc = true;
      orderExpectedValue = expectedValue;
    
      // check existence of control order, if historic then its state does not match the job node state
      var"' id='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></add_order>" );
      } else {
        orderDOM = executeXML( "<show<modify_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' whatstate='payload'/>" );
    + jobChainNodeState if+ ( orderDOM ) {
        var orderState = orderDOM.selectSingleNodeValue( "/spooler/answer/order/@state"' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></modify_order>" );
      }
      if (!orderState || orderState != jobChainNodeState) {
          rc = false;
        }
      } elsespooler_job.state_text = "next expected value: " + orderExpectedValue;
    }
    
    function calculateOrderExpectedValue( currentValue ) {
      // calculate rcnext =date false;
    for a }
    date parameter
      // add or modify control order
      if (!rc) {
        orderDOM = executeXML( "<add_order job_chain='" + jobChainPath + "' id='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></add_order>" );
      } else {
        orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + controlOrderID + "' state='" + jobChainNodeState + "' suspended='yes'><params><param name='" + controlOrderID + "' value='" + orderExpectedValue + "'/></params><run_time/></modify_order>" );
      }
      spooler_job.state_text = "next expected value: " + orderExpectedValue;
    }
    
    function calculateOrderExpectedValue( currentValue ) {
      // calculate next date for a date parameter
      // var expectedValue = (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10);
      // calculate increment for a numeric parameter or order id
      // var expectedValue = parseInt(currentValue) + 1;
    
      var expectedValue = eval( orderExpectedValueFunction );
      return expectedValue;
    }
    
    function spooler_init() {
      jobChainPath = spooler_task.order.job_chain.path;
      jobChainNodeState = spooler_task.order.job_chain_node.state;
      controlOrderID = controlOrderPrefix + spooler_task.order.job_chain_node.state;
    
      return true;
    }
    
    function spooler_process() {
      var rc = true;
    
      // control order is always suspended
      if (spooler_task.order.id == controlOrderID) { var expectedValue = (new Date( (new Date(currentValue)).setDate( (new Date(currentValue)).getDate()+1 ) )).toISOString().substring(0,10);
      // calculate increment for a numeric parameter or order id
      // var expectedValue = parseInt(currentValue) + 1;
    
      var expectedValue = eval( orderExpectedValueFunction );
      return expectedValue;
    }
    
    function spooler_init() {
      jobChainPath = spooler_task.order.job_chain.path;
      jobChainNodeState = spooler_task.order.job_chain_node.state;
      controlOrderID = controlOrderPrefix + spooler_task.order.job_chain_node.state;
    
      return true;
    }
    
    function spooler_process() {
      var rc = true;
    
      // control order is always suspended
      if (spooler_task.order.id == controlOrderID) {
        spooler_log.info( ".. control order identified, processing suspended" );
        suspendOrder();
        return rc;
      }
    
      // merge parameters from task and order
      var params = spooler.create_variable_set();
      params.merge( spooler_task.params );
      params.merge( spooler_task.order.params );
    
      // get expected value from a parameter name or from the Order ID
      spooler_log.info( ".. control parameter for expected value is looked up: " + controlOrderPrefix + "expected_parameter" );
      var expectedParameter = params.value( controlOrderPrefix + "expected_parameter" );
      if (expectedParameter) {
        var currentValue = params.value( expectedParameter );
        spooler_log.info( ".. current value is used from control order identified, processing suspended" parameter [" + expectedParameter + "]: " + currentValue );
      } else suspendOrder();{
        returnvar rc;
    currentValue  }
    = spooler_task.order.id;
      // merge parameters from task and order
      var params = spooler.create_variable_set();
      params.merge( spooler_task.params spooler_log.info( ".. current value is used from order id: " + currentValue );
      params.merge( spooler_task.order.params );}
    
      // get expected default value from a parameter name or from the Order ID
      spooler_log.info( ".. control parameter for expected value is looked up: " + controlOrderPrefix + "expected_parameter" );
      var expectedParameter order id
      if (!orderExpectedDefaultValue) {
        orderExpectedDefaultValue = params.value( controlOrderPrefix + "expected_parameterdefault_value" );
        if (expectedParameterorderExpectedDefaultValue) {
        var currentValue = params.value( expectedParameter );
        spooler_log.info( ".. currentdefault value is used from control parameter [" + expectedParametercontrolOrderPrefix + "expected_default_value]: " + currentValueorderExpectedDefaultValue );
        } else {
        var currentValue = spooler_task.order.id;
    
          spooler_log.info( ".. currentdefault value is used from order id: " + currentValuespooler_task.order.id );
      }
    
       // getorderExpectedDefaultValue expected default value from a parameter or from the order id
      if (!orderExpectedDefaultValue) {
        orderExpectedDefaultValue = spooler_task.order.id;
        }
      }
    
      // get expected value calculation function
      var expectedValueFunction = params.value( controlOrderPrefix + "expected_defaultvalue_valuefunction" );
        if (orderExpectedDefaultValueexpectedValueFunction) {
        orderExpectedValueFunction = expectedValueFunction;
        spooler_log.info( ".. defaultusing expected value is used from control parameter [" + controlOrderPrefix + "expected_default_value]function: " + orderExpectedValueFunction );
      } else {
        spooler_log.info( ".. using expected value default function: " + orderExpectedDefaultValueorderExpectedValueFunction );
      }
    
      // check } else {if order value matches expectation
      if ( getOrderExpectedValue()  spooler_log.info( ".. default value is used from order id: " + spooler_task.order.id );
          orderExpectedDefaultValue = spooler_task.order.id;
        }
      }
    
      // get expected value calculation function
      var expectedValueFunction = params.value( controlOrderPrefix + "expected_value_function"== currentValue ) {
        // after processing of the current order all suspended orders are activated
        spooler_log.info( ".. current order provides expected value: " + getOrderExpectedValue() );
        activateSuspendedOrders();
        setOrderExpectedValue( calculateOrderExpectedValue( getOrderExpectedValue() ) );
      if} (expectedValueFunction)else {
        orderExpectedValueFunction = expectedValueFunction;// suspend non-matching order
        spooler_log.info( ".. usingsuspending current order: expected value function:=" + getOrderExpectedValue() + ", current value=" + orderExpectedValueFunction );
      } else currentValue );
        suspendOrder();
      }
    
      return rc;
    }
    
    function suspendOrder() {
        spooler_log.info( ".. using expected value default function: " + orderExpectedValueFunction );
      }task.order.suspended = true;
        spooler_task.order.state = jobChainNodeState;
    }
    
    function activateSuspendedOrders() {
      var rc = true;
      var orderList = Array();
    
      // checkselect ifsuspended orderorders valueof matchesthe expectation
    current  if ( getOrderExpectedValue() == currentValue ) {job node
      var orderDOM //= after processing of the current order all suspended orders are activated
        spooler_log.info( ".. current order provides expected value: " + getOrderExpectedValue() );
        activateSuspendedOrders();
        setOrderExpectedValue( calculateOrderExpectedValue( getOrderExpectedValue() ) );
      } elseexecuteXML( "<show_job_chain job_chain='" + jobChainPath + "' what='job_chain_orders'/>" );
      var orderNodes = orderDOM.selectNodeList( "/spooler/answer/job_chain/job_chain_node[@state = '" + jobChainNodeState + "']/order_queue/order[@suspended = 'yes']" );
    
      // traverse order list and add orders to sort array
      for( orderIndex=0; orderIndex<orderNodes.getLength(); orderIndex++ ) {
        //var suspendorderNode non-matching order
        spooler_log.info( ".. suspending current order: expected value=" + getOrderExpectedValue() + ", current value=" + currentValue = orderNodes.item(orderIndex);
        var orderID = orderDOM.selectSingleNodeValue( orderNode, "@id" );
        suspendOrder();
      }
    
      return rc;
    }
    
    function suspendOrder() {
        spooler_task.order.suspended = true;if (orderID == null || controlOrderID) {
          continue;
        }
        spooler_task.order.state = jobChainNodeState;
    }
    
    function activateSuspendedOrders() {
      var rc = truelog.info( ".... suspended order found: " + orderID );
      var  orderList.push( =orderID Array();
    
      // select suspended orders}
    
     of the// currentalphabetical jobstring nodesort
      var orderDOM = executeXML( "<show_job_chain job_chain='" + jobChainPath + "' what='job_chain_orders'/>" orderList.sort(function(a, b){return (a > b) - (a < b) });
      var// orderNodesnumeric = orderDOM.selectNodeList( "/spooler/answer/job_chain/job_chain_node[@state = '" + jobChainNodeState + "']/order_queue/order[@suspended = 'yes']" sort
      // orderList.sort(function(a, b){return b - a) });
    
      // traverse order list and add orders to sort array
      for( orderIndex=0; orderIndex<orderNodes.getLength(); orderIndex++ ) {
        var orderNode = orderNodes.item(orderIndex);
        var orderID = orderDOM.selectSingleNodeValue( orderNode, "@id for(i=0; i<orderList.length; i++) {
        spooler_log.info( ".... activating order: " + orderList[i] );
        orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + orderList[i] + "' state='" + jobChainNodeState + "' suspended='no'/>" );
        if (orderID == null || controlOrderID !orderDOM ) {
          continuerc = false;
        }  
      }
    
       spooler_log.info( ".... suspended order found: " + orderID );
        orderList.push( orderID );
      }
    
      // alphabetical string sort
      orderList.sort(function(a, b){return (a > b) - (a < b) });
      // numeric sort
      // orderList.sort(function(a, b){return b - a) });
    
      for(i=0; i<orderList.length; i++) {
        spooler_log.info( ".... activating order: " + orderList[i] );
        orderDOM = executeXML( "<modify_order job_chain='" + jobChainPath + "' order='" + orderList[i] + "' state='" + jobChainNodeState + "' suspended='no'/>" );
        if ( !orderDOM ) {
          rc = false;
        }  
      }
    
      return rc;
    }

    Usage

    ...

    • Use an order id in descending alphabetical order, e.g. "cba" for the order id of the first order and "abc" for the order id of the second order.

    ...

    return rc;
    }

    Usage

    • Add the orders to the job_chain_expect_orders job chain. For your convenience the orders 1, 2 and 3 are provided with the sample.
      • Each order uses a sequence parameter with a sequential number such as 1, 2, 3.
      • The expect job uses the parameters:
        • control_order_expected_parameter with the value: sequence
        • control_order_expected_default_value with the value: 1
        • control_order_expected_value_function with the value: parseInt(currentValue) + 1
    • Start order 1 that matches the expected value (parameter sequence=1):
      • The order will be processed and then moved to the next_job job.
      • A control order named control_order_expect will be dynamically generated and suspended. This order contains the next expected value (parameter control_order_expect=2)
    • Start order 3 that does not match the expected value (parameter sequence=3)
      • The order will be suspended.
      • The control order with the expected value (parameter control_order_expect=2) will remain unchanged.
    • Start order 2 that matches the expected value (parameter sequence=2:
      • Order 2 will be processed and moved to the next_job job. 
        • The control order will be automatically modified to carry the next expected value (parameter control_order_expect=3).
      • Order 3 will be processed and then moved to the next_job job.
        • The control order will be automatically modified to carry the next expected value (parameter control_order_expect=4).
    • For use with a date sequence instead of a numeric sequence:
      • the orders 1, 2, 3 carry a second parameter business_date with values in an ISO date format. 
      • the expect job carries sample parameters (prefixed with sample_) that can be activated (renamed) to work with the business_date parameter instead of the sequence parameter of the incoming orders .

    .

    ...