Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
outlinh1. true
outlinh1. true
1printablefalse
2stylh1. none
3indent20px

WORK IN PROGRESS!

Introduction

Consider the situation where files with different characteristics (categories (i.e. files with similar characteristics such as type, source, or similar) can be processed in parallel but all the files with common characteristics of any one category have to be processed sequentially.

For example, the centralised data processing of a company receives stock movement reports from subsidiaries at regular intervals. Whilst the reports from different subsidiaries can be processed simultaneously, the reports from each subsidiary have to be processed sequentially.

The 'Best Practice' Solution

Our recommended approach for this situation would be:

Graphviz
digraph {

graph [rankdir=TB]
node [shape=Mrecord,style="filled",fillcolor=lightblue]
node [fontsize=10]
ranksep=0.3
	subgraph cluster_0 {

		style=filled;
		color=lightgrey;
		node [style=filled,color=white];
		label = "JobScheduler";
		labeljust = "l";
		pad = 0.1;

FileOrderSource[label= "{<f0> FileOrder&#92;nSource|<f1>Identify&#92; Category|<f2>Is&#92; Lock&#92; Set|{<f3>Yes|<f4>No}}"];
Setback[label= "{<f0>Setback&#92;nWait}"];
StartProcessing[label= "{<f0>Set&#92; Lock|Start&#92; Processing&#92; File}"];
EndProcessing[label= "{<f0>End&#92; Processing&#92; File|<f1>Release&#92; Lock}"];

FileOrderSource:f3 -> Setback ->  FileOrderSource:f2;
FileOrderSource:f4 -> StartProcessing -> EndProcessing [weight=4];
	}

File_Transfer ->  FileOrderSource;
}
  • The most important feature of this solution is that it only requires one job chain and one set of jobs to separate the processing of different categories of files, thereby simplifying maintenance of the jobs and job chain.
  • Use a single 'load_file' File Order Source directory to which all files are delivered.
  • JobScheduler would use regular expressions to identify the files arriving in this directory on the basis of their names, timestamps or file extensions and forward them for processing accordingly.
  • JobScheduler would then set a

    '

    lock

    '

    for the subsidiary whose file is being processed to prevent further files from this subsidiary being processed as long as processing continues.

    No Format

    Should

    a

    'new'

    file

    from

    this

    subsidiary

    arrive

    whilst

    its

    predecessor

    is

    being

    processed,

    the

    job

    'receiving'

    the

    new

    file

    will

    be

    '

    set

    back

    '

    by

    JobScheduler

    as

    long

    as

    the

    lock

    for

    the

    subsidiary

    is

    set.

  • This lock would be released by JobScheduler once processing of the 'first' file has been completed.
  • The job 'receiving' the new file will now be able to forward this the new file for processing.

The Solution in Detail

  • JS

    JobScheduler starts as soon as a file matching

    with Regular Expression found in in directory

    a regular expression is found in the directory.

    This directory is set in the "File Order Sources" area in the "Steps/Nodes" view of the "load_files" job chain as shown in the screenshot below:

    Image Added

 

  • JobSchedulerJS's aquire_lock job matches file with files using regular expression expressions and decide determines the file's category i.e. Berlin or Munich- for example, Berlin or Munich.

    The regular expressions are also defined in the "File Order Sources" area shown in the screenshot. Note that for simplicity the regular expressions match the prefixes "a" and "b" in the file names and not directly "Berlin" or "Munich". The aquire_lock job uses a Rhino JavaScript to try to aquire the lock and either wait if the lock is not available or proceed to the next node if it is.
    This script is listed below:

    Code Block
    languagejavascript
    function spooler_process() {
        
     try {
         var parameters = spooler_task.order().params();
         var filePath = "" + String(parameters.value("scheduler_file_path"));
         spooler_log.info( " scheduler_file_path : " + filePath );
    
         var param1 = "" + String(parameters.value("param1"));
         spooler_log.info( " param1 : " + param1 );
    
         var fileParts = filePath.split("\\");
         var fileName  = fileParts[fileParts.length-1];
         spooler_log.info( "fileName : " + fileName );
    
         if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) {
            var lockName = "BERLIN_PROC";
            var lock_name = "BERLIN_PROC";
            spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
         }
        
         if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) {
            var lockName = "MUNICH_PROC";
            spooler_log.info( "File matched with berlin lock_name : "+ lockName  );
         }
    
         spooler_task.order().params().set_value("file_spec",fileName);
         spooler_task.order().params().set_value("lock_name",lockName);      
             
         if (!spooler.locks().lock_or_null( lockName )) {
             var lock = spooler.locks().create_lock();
             lock.set_name( lockName );     
             spooler.locks().add_lock( lock );     
             if (spooler_task.try_hold_lock( lockName )) {          
                 return true;
             } else {
               spooler_task.call_me_again_when_locks_available();
             }
        } else {      
          spooler_task.order().setback();
          spooler_log.info( " lock is already aquired , lock_name : "+ lockName +" , setback , will retry after some time");      
        }
    
        return true;
    
      } catch (e) {
       
        spooler_log.warn("error occurred    : " + String(e));
        return false;
    
      }
    }
    
  • Once aquire_lock finds the matching category its it will try to set an Semaphore a semaphore (Flagflag) using JSJobScheduler's inbuilt LOCK mechanism
  • There is only Only one instance on of each LOCK is allowed or once LOCK is assigned to first file of Berlin category, next file has to wait or setback until the LOCK is free.as can be seen in Max Non Exclusive parameter shown in the screenshot of JobScheduler's JOE interface below:Image Added

 

  • Once a LOCK has been assigned to a file from a category (either Berlin or Munich), all subsequent files for this category have to wait with a setback until the LOCK has been released.
  • The

    THe

    same mechanism will be repeated for files from

    category Munich but since the LOCK is not acquired ( or Semaphore (Flag)) is not set for Munich, file from category Munich will

    other categories. As long as a file of any given category is not being processed and therefore the corresponding LOCK not been set, the way will be free for the file from the other category to be allowed to be processed.

    This can be seen in the following screenshot of the JOC interface showing the progression of file orders along the load_files job chain:

    Image Added

 

  • Once process is finished depending upon success or error, JS JobScheduler will move the file from the in folder to either the done (on success) or failed (on error) folders.
  • After moving the input file to the correct target directory JS job by JobScheduler, the release_lock job will be called, which will remove the lock/Semaphore semaphore from JS JobScheduler and allow the next file from same category will be allowed.

Restrictions with this solution

  • the same category to be processed.

The following screenshot from the JobScheduler's JOE interface shows the nodes in the load_files job chain and their related states:

Image Added

The release_lock job uses a similar Rhino JavaScript to the aquire_lock as can be seen in the following listing:

Code Block
languagejavascript
function spooler_process() {

  try {
     var parameters = spooler_task.order().params();
    
     var filePath = "" + String(parameters.value("scheduler_file_path"));
     spooler_log.info( " scheduler_file_path : " + filePath );
     var lockName = "" + String(parameters.value( "lock_name" ));
     spooler_log.info( "  lock_name : " +  lockName );

     if (spooler.locks().lock_or_null( lockName )) {
        spooler.locks().lock( lockName ).remove();
     }
     return true;

  } catch (e) {

    spooler_log.warn("error occurred: " + String(e));
    return false;

  }
}
  • A job to force release_lock is also required for the situation that the processing of a job fails without the corresponding lock being released.

  • The script for this job to release the 'Berlin' lock would look something like:

Code Block
languagejavascript
function spooler_process() {

  try {
    var lock_name = "BERLIN_PROC";
    spooler.locks().lock(lock_name).remove();
    return true;
  } catch (e) {

    spooler_log.warn("error occurred: " + String(e));
    return false;

  }
}

Limitations of this solution

  • The limitation of this approach will become apparent if more than one file should arrive from a subsidary at once.

    This is because there is no guarantee that files arriving together will be processed in any given order. This situation typically occurs after an unplanned loss of a file transfer connection. After the connection has been restored, there is a) no guarantee that the

    The trick with this approach is to ensure that the files are processed sequentially, should more than one file arrive from a subsidary at once because there is no guarantee that a 'group' of

    files arriving as a batch will be written to the file systen in a particular order b) no way for JobScheduler to know how many files will arrive as a batch.

     

  • One

    approach

    solution here would be to wait until a steady state in the incoming directory has been reached (i.e. no new file has been added and the size of all files remains constant over a

    period such as a minute

    suitable period of time) before starting to identify files

    and forward them for processing

    . JobScheduler could then

    sort

    order files according to their names before forwarding them for processing.

    The downside of this second approach is that it brings a delay in starting processing with it, due to the need wait to check whether all members of a batch of files has arrived by checking for a steady state.

Solution Demo

...

A demonstration of this solution is available for download from:

Demo Installation

  • Unpack the zip file to a local directory
  • Copy the 'SQLLoaderProc' folder to your JobScheduler 'live' folder.

    No Format
    
     ... TODO - ADD LINK TO DOKU
    
  • Copy the 'Data' folder to the a suitable local location.

    noformat

    The

    default

    location

    for

    this

    folder,

    which

    is

    specified

    in

    the

    job configurations, is: {{C:\sandbox}} This location is used in this FAQ. The following paths have to be modified if the location of the 'Data' folder is changed:

    configurations in the demo jobs, is:
        C:\sandbox

    • The 'load_files' File Order Source directories in the load_files.job_chain.xml job chain object
    • the 'source_file' and 'target_file' paths specified as parameters in the move_file_suc.job.xml and move_file_error.job.xml objects
  • Note that the following paths have to be modified if the location of the 'Data' folder is changed:

Running the demo

  • Just copy files from the 'Data/__test-files' folder to the 'in' folder, :
    • JobScheduler will automatically start processing within a few seconds
    • Once processing has been completed the file(s) added to the 'in' folder will be moved to the 'done' or 'failed' folders, depending on whether processing was successful or not.
  • DO NOT attempt to start an order for the job chain. This will only cause an error in the aquire_lock job.

How does the Demo Work?

BoxTitle=The demo 'load_files' job chain

BoxContent

Graphviz
digraph "Example: File Transfer using JadeJob"{

graph [rankdir=TB]
node [shape=box,style="rounded,filled",fillcolor=aquamarine]
node [fontsize=10]
ranksep=0.3

FolderIn_a ->  start
FolderIn_b ->  start

start  -> aquire_lock  -> load_file_process  -> move_file_suc  -> suc_release_lock  -> end_suc  -> success
move_file_err -> error_release_lock -> end_err -> error

}

The demo job chain ('load_files') is shown in the diagram to the right.

Show If
groupsos-members
Info
titlerework content
  • Could someone provide a Unix example?
  • The following section of text is somewhat orphaned: should it be removed?

 

TODO - ADD SCREENSHOT OF JOB CHAIN IN JOE

The 'Folderin_a' and 'Folderin_b' symbols represent the 'File Order Source' directory monitored by the 'load_files' job chain. The remaining jobs in the chain shown the individual chain nodes shown the job chain 'Steps/Nodes' form. Note that the secondary job chain shown the behaviour in the event of an error.

  • JobScheduler starts as soon as file matching with Regular Expression found in

...

  • the File Order Source directory.
  • JobScheduler'

...

  • aquire_lock

...

  •  job matches file with regular expression and decide file's category i.e. Berlin or Munich.
  • Once aquire_lock finds the matching category its try to set an Semaphore (Flag) using JobScheduler's inbuilt LOCK mechanism
  • There is only

...

  • one instance on LOCK is allowed or once LOCK is assigned to first file of Berlin category, next file has to wait or setback until the LOCK is free.

...

  • The same mechanism will be repeated for files from category Munich but since the LOCK is not acquired ( or Semaphore (Flag)) is not set for Munich, file from category Munich will be allowed to be processed.
  • Once process is finished depending upon success or error , JS will move the file from the 'in' folder to either the 'done' (on success) or 'failed' (on error).
  • After moving the input file to correct target directory the JobScheduler

...

  • job release_lock

...

  •  will be called which will remove the lock/

...

  • semaphore from

...

  • JobScheduler and the next file from same category will be allowed.

...

Graphical Representation of the Solution

Graphviz

digraph "Example: File Transfer using JadeJob"\{

graph [rankdir=TB]
node [shape=box,style="rounded,filled",fillcolor=aquamarine]
node [fontsize=10]
ranksep=0.3

FolderIn_a ->  start
FolderIn_b ->  start

start  -> aquire_lock  -> load_file_process  -> move_file_suc  -> suc_release_lock  -> end_suc  -> success
move_file_err -> error_release_lock -> end_err -> error
\}

See also

 See also: