Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Conversion corrections

Table of Contents
outlinh1. true
outlinh1. true
1printablefalse
2stylh1. none
3indent20px

Introduction

Consider the situation where files with different categories (i.e. files with similar characteristics such as type, source, or similar) can be processed in parallel but all the files of any one category have to be processed sequentially.

...

Our recommended approach for this situation would be:

Graphviz

digraph \{

graph [rankdir=TB]
node [shape=Mrecord,style="filled",fillcolor=lightblue]
node [fontsize=10]
ranksep=0.3
	subgraph cluster_0 \{

		style=filled;
		color=lightgrey;
		node [style=filled,color=white];
		label = "JobScheduler";
		labeljust = "l";
		pad = 0.1;

FileOrderSource[label= "\{<f0> FileOrder&#92;nSource|<f1>Identify&#92; Category|<f2>Is&#92; Lock&#92; Set|\{<f3>Yes|<f4>No\}\}"];
Setback[label= "\{<f0>Setback&#92;nWait\}"];
StartProcessing[label= "\{<f0>Set&#92; Lock|Start&#92; Processing&#92; File\}"];
EndProcessing[label= "\{<f0>End&#92; Processing&#92; File|<f1>Release&#92; Lock\}"];

FileOrderSource:f3 -> Setback ->  FileOrderSource:f2;
FileOrderSource:f4 -> StartProcessing -> EndProcessing [weight=4];
	\}

File_Transfer ->  FileOrderSource;
\}
  • The most important feature of this solution is that it only requires one job chain and one set of jobs to seperate the processing of different categories of files, thereby sinplifying maintenance of the jobs and job chain.
  • Use a single 'load_file' File Order Source directory to which all files are delivered.
  • JobScheduler would use regular expressions to identify the files arriving in this directory on the basis of their names, timestamps or file extensions and forward them for processing accordingly.
  • JobScheduler would then set a 'lock' for the subsidiary whose file is being processed to prevent further files from this subsidiary being processed as long as processing continues.

    noformat

    Should

    a

    'new'

    file

    from

    this

    subsidiary

    arrive

    whilst

    its

    predecessor

    is

    being

    processed,

    the

    job

    'receiving'

    the

    new

    file

    will

    be

    'set

    back'

    by

    JobScheduler

    as

    long

    as

    the

    lock

    for

    the

    subsidiary

    is

    set.

  • This lock would be released by JobScheduler once processing of the 'first' file has been completed.
  • The job 'receiving' the new file will now be able to forward the new file for processing.

The Solution in Detail

  • JS starts as soon as file matching with Regular Expression found in the directory.

    noformat

    This

    directory

    is

    set

    in

    the

    "File

    Order

    Sources"

    area

    in

    the

    "Steps/Nodes"

    view

    of

    the

    "load_files"

    job

    chain

    as

    shown

    in

    the

    screenshot

    below:



    Image Modified

 

  • JobScheduler's aquire_lock job matches files using regular expressions and determines the file's category - for example, Berlin or Munich.noformat

    The

    regular

    expressions

    are

    also

    defined

    in

    the

    "File

    Order

    Sources"

    area

    shown

    in

    the

    screenshot.

    Note

    that

    for

    simplicity

    the

    regular

    expressions

    match

    the

    prefixes

    "a"

    and

    "b"

    in

    the

    file

    names

    and

    not

    directly

    "Berlin"

    or

    "Munich".

    The

    {{

    aquire_lock

    }}

    job

    uses

    a

    Rhino

    JavaScript

    to

    try

    to

    aquire

    the

    lock

    and

    either

    wait

    if

    the

    lock

    is

    not

    available

    or

    proceed

    to

    the

    next

    node

    if

    it

    is.


    This

    script

    is

    listed

    below:

    Code Block
    languagejavascript

...

  • function spooler_process() 

...

  • {
        
     try 

...

  • {
    
         var parameters = spooler_task.order().params();
         var filePath = "" + String(parameters.value("scheduler_file_path"));
         spooler_log.info( " scheduler_file_path : " + filePath );
    
     //
    
         var param1 = "" + String(parameters.value("param1"));
         spooler_log.info( " param1 : " + param1 );
    
     //
    
         var fileParts = filePath.split("\\");
         var fileName  = fileParts[fileParts.length-1];
         spooler_log.info( "fileName : " + fileName );
    
         if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) 

...

  • {
            var lockName = "BERLIN_PROC";
            var lock_name = "BERLIN_PROC";
            spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
         

...

  • }
        
         if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) 

...

  • {
            var lockName = "MUNICH_PROC";
            spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
         

...

  • }
    
         spooler_task.order().params().set_value("file_spec",fileName);
         spooler_task.order().params().set_value("lock_name",lockName);      
             
         if (!spooler.locks().lock_or_null( lockName )) 

...

  • {
             var lock = spooler.locks().create_lock();
             lock.set_name( lockName );     
             spooler.locks().add_lock( lock );     
             if (spooler_task.try_hold_lock( lockName )) 

...

  • {          
                 return true;
             

...

  • } else 

...

  • {
               spooler_task.call_me_again_when_locks_available();
             

...

  • }
        

...

  • } else 

...

  • {      
          spooler_task.order().setback();
          spooler_log.info( " lock is already aquired , lock_name : "+ lockName +" , setback , will retry after some time");      
        

...

  • }
    
        return true;
    
      

...

  • } catch (e) 

...

  • {
       
        spooler_log.warn("error occurred    : " + String(e));
        return false;
    
      

...

  • }
    
    

...

  • }
    
  • Once aquire_lock finds the matching category it will try to set a semaphore (flag) using JobScheduler's inbuilt LOCK mechanism
  • Only one instance of each LOCK is allowed as can be seen in Max Non Exclusive parameter shown in the screenshot of JobScheduler's JOE interface below:Image Added

Image Removed 

  • Once a LOCK has been assigned to a file from a category (either Berlin or Munich), all subsequent files for this category haves to wait with a setback until the LOCK has been freed.
  • The same mechanism will be repeated for files from other categories. As long as a file of any given category is not being processed and therefore the corresponding LOCK not been set, the way will be free for the file from the other category to be allowed to be processed.

    noformat

    This

    can

    be

    seen

    in

    the

    following

    screenshot

    of

    the

    JOC

    interface

    showing

    the

    progression

    of

    file

    orders

    along

    the

    {{

    load_files

    }}

    job

    chain

    .

    :

    Image Modified

 

  • Once process is finished depending upon success or error, JobScheduler will move the file from the in folder to either the done (on success) or failed (on error) folders.
  • After moving input file to correct target directory JobScheduler, the release_lock job will be called, which will remove the lock/semaphore from JobScheduler and allow the next file from same category to be processed.

...

Code Block
languagejavascript

function spooler_process() \{

  try \{

     var parameters = spooler_task.order().params();
    
     var filePath = "" + String(parameters.value("scheduler_file_path"));
     spooler_log.info( " scheduler_file_path : " + filePath );
     var lockName = "" + String(parameters.value( "lock_name" ));
     spooler_log.info( "  lock_name : " +  lockName );

/*
     var fileParts = filePath.split("\\");
     var fileName  = fileParts[fileParts.length-1];
     spooler_log.info( "fileName : " + fileName );

     if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) \{
        var lockName = "BERLIN_PROC";
        var lock_name = "BERLIN_PROC";
        spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
     \}
    
     if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) \{
        var lockName = "MUNICH_PROC";
        spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
     \}
*/
     if (spooler.locks().lock_or_null( lockName )) \{
        spooler.locks().lock( lockName ).remove();
     \}
     return true;

  \} catch (e) \{

    spooler_log.warn("error occurred: " + String(e));
    return false;

  \}

\}
  • A job to force release_lock is also required for the situation that the processing of a job fails without the corresponding lock being released.

    No Format
    
     The script for this job to release the 'Berlin' lock would look something like:
    
Code Block
languagejavascript

function spooler_process() \{

  try \{

    var lock_name = "BERLIN_PROC";
   
      spooler.locks().lock(lock_name).remove();
   
 return   return truetrue;

  \} catch (e) \{

    spooler_log.warn("error occurred: " + String(e));
    return false;

  \}

\}

Limitations of this solution

  • The limitation of this approach will become apparent if more than one file should arrive from a subsidary at once.

    noformat

    This

    is

    because

    there

    is

    no

    guarantee

    that

    files

    arriving

    together

    will

    be

    processed

    in

    any

    given

    order.

    This

    situation

    typically

    occurs

    after

    an

    unplanned

    loss

    of

    a

    file

    transfer

    connection.

    After

    the

    connection

    has

    been

    restored,

    there

    is

    a)

    no

    guarantee

    that

    the

    files

    arriving

    as

    a

    batch

    will

    be

    written

    to

    the

    file

    systen

    in

    a

    particular

    order

    b)

    no

    way

    for

    JobScheduler

    to

    know

    how

    many

    files

    will

    arrive

    as

    a

    batch.

     

  • One solution here would be to wait until a steady state in the incoming directory has been reached (i.e. no new file has been added and the size of all files remains constant over a suitable period of time) before starting to identify files. JobScheduler could then order files according to their names before forwarding them for processing.

    noformat

    The

    downside

    of

    this

    second

    approach

    is

    that

    it

    brings

    a

    delay

    in

    starting

    processing

    with

    it,

    due

    to

    the

    need

    wait

    to

    check

    whether

    all

    members

    of

    a

    batch

    of

    files

    has

    arrived

    by

    checking

    for

    a

    steady

    state.

Solution Demo

A demonstration of this solution is available for download from:

...

  • Unpack the zip file to a local directory
  • Copy the 'SQLLoaderProc' folder to your JobScheduler 'live' folder.

    No Format
    
     TODO - ADD LINK TO DOKU
    
  • Copy the 'Data' folder to the a suitable local location.

    noformat

    The

    default

    location

    for

    this

    folder,

    which

    is

    specified

    in

    the

    configurations

    in

    the

    demo

    jobs,

    is:

    {{


        C:\sandbox

    }}


    Note

    that

    the

    following

    paths

    have

    to

    be

    modified

    if

    the

    location

    of

    the

    'Data'

    folder

    is

    changed:

    • The 'load_files' File Order Source directories in the load_files.job_chain.xml job chain object
    • the 'source_file' and 'target_file' paths specified as parameters in the move_file_suc.job.xml and move_file_error.job.xml objects

Running the demo

  • Just copy files from the 'Data/__test-files' folder to the 'in' folder:
    • JobScheduler will automatically start processing within a few seconds
    • Once processing has been completed the file(s) added to the 'in' folder will be moved to the 'done' or 'failed' folders, depending on whether processing was successful or not.
  • DO NOT attempt to start an order for the job chain. This will only cause an error in the aquire_lock job.

How does the Demo Work?

{{DiagramBoxRight

BoxTitlh1. The BoxTitle=The demo 'load_files' job chain

BoxContent

Graphviz

digraph "Example: File Transfer using JadeJob"{

graph [rankdir=TB]
node [shape=box,style="rounded,filled",fillcolor=aquamarine]
node [fontsize=10]
ranksep=0.3

FolderIn_a ->  start
FolderIn_b ->  start

start  -> aquire_lock  -> load_file_process  -> move_file_suc  -> suc_release_lock  -> end_suc  -> success
move_file_err -> error_release_lock -> end_err -> error

}

}}

The demo job chain ('load_files') is shown in the diagram to the right.

...

TODO

...

-

...

ADD

...

SCREENSHOT

...

OF

...

JOB

...

CHAIN

...

IN

...

JOE

...

The

...

'Folderin_a'

...

and

...

'Folderin_b'

...

symbols

...

represent

...

the

...

'File

...

Order

...

Source'

...

directory

...

monitored

...

by

...

the

...

'load_files'

...

job

...

chain.

...

The

...

remaining

...

jobs

...

in

...

the

...

chain

...

shown

...

the

...

individual

...

chain

...

nodes

...

shown

...

the

...

job

...

chain

...

'Steps/Nodes'

...

form.

...

Note

...

that

...

the

...

secondary

...

job

...

chain

...

shown

...

the

...

behaviour

...

in

...

the

...

event

...

of

...

an

...

error.

...

  • JobScheduler starts as soon as file matching with Regular Expression found in the File Order Source directory.
  • JobScheduler's aquire_lock job matches file with regular expression and decide file's category i.e. Berlin or Munich.
  • Once aquire_lock finds the matching category its try to set an Semaphore (Flag) using JobScheduler's inbuilt LOCK mechanism
  • There is only one instance on LOCK is allowed or once LOCK is assigned to first file of Berlin category, next file has to wait or setback until the LOCK is free.
  • The same mechanism will be repeated for files from category Munich but since the LOCK is not acquired ( or Semaphore (Flag)) is not set for Munich, file from category Munich will be allowed to be processed.
  • Once process is finished depending upon success or error , JS will move the file from the 'in' folder to either the 'done' (on success) or 'failed' (on error).
  • After moving input file to correct target directory JobScheduler job release_lock will be called which will remove the lock/Semaphore from JS and next file from same category will be allowed.

...