...
Consider the situation where files with different characteristics (categories(i.e. files with similar characteristics such as type, source, or similar) can be processed in parallel but all the files with common characteristics of any one category have to be processed sequentially.
...
Graphviz |
---|
digraph \{ graph [rankdir=TB] node [shape=Mrecord,style="filled",fillcolor=lightblue] node [fontsize=10] ranksep=0.3 subgraph cluster_0 \{ style=filled; color=lightgrey; node [style=filled,color=white]; label = "JobScheduler"; labeljust = "rl"; pad = 0.1; FileOrderSource[label= "\{<f0> FileOrder\nSource|<f1>Identify\ Category|<f2>Is\ Lock\ Set|\{<f3>Yes|<f4>No\}\}"]; Setback[label= "\{<f0>Setback\nWait\}"]; StartProcessing[label= "\{<f0>Start<f0>Set\ Lock|Start\ Processing\ File\}"]; EndProcessing[label= "\{<f0>End\ Processing\ File|<f1>Release\ Lock\}"]; FileOrderSource:f3 -> Setback -> FileOrderSource:f2; FileOrderSource:f4 -> StartProcessing -> EndProcessing [weight=4]; \} File_Transfer -> FileOrderSource; \} |
...
- JS starts as soon as file matching with Regular Expression found in the directory.
No Format This directory is set in the "File Order Sources" area in the "Steps/Nodes" view of the "load_files" job chain. TODO: SCREENSHOT
- JSJobScheduler's
aquire_lock
job matches file with files using regular expression expressions and decide determines the file's category : - for example, Berlin or Munich.No Format The regular expressions are also defined in the "File Order Sources" area shown in the screenshot. Note that the regular expressions match with the prefixes "a" and "b" in the file names and not directly "Berlin" or "Munich". The aquire_lock job uses a Rhino JavaScript to try to aquire the lock and wait if the lock is not available. This script is listed below.
Code Block | ||
---|---|---|
| ||
function spooler_process() \{ try \{ var parameters = spooler_task.order().params(); var filePath = "" + String(parameters.value("scheduler_file_path")); spooler_log.info( " scheduler_file_path : " + filePath ); // var param1 = "" + String(parameters.value("param1")); spooler_log.info( " param1 : " + param1 ); // var fileParts = filePath.split("\\"); var fileName = fileParts[fileParts.length-1]; spooler_log.info( "fileName : " + fileName ); if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) \{ var lockName = "BERLIN_PROC"; var lock_name = "BERLIN_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName ); \} if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) \{ var lockName = "MUNICH_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName ); \} spooler_task.order().params().set_value("file_spec",fileName); spooler_task.order().params().set_value("lock_name",lockName); if (!spooler.locks().lock_or_null( lockName )) \{ var lock = spooler.locks().create_lock(); lock.set_name( lockName ); spooler.locks().add_lock( lock ); if (spooler_task.try_hold_lock( lockName )) \{ return true; \} else \{ spooler_task.call_me_again_when_locks_available(); \} \} else \{ spooler_task.order().setback(); spooler_log.info( " lock is already aquired , lock_name : "+ lockName +" , setback , will retry after some time"); \} return true; \} catch (e) \{ spooler_log.warn("error occurred : " + String(e)); return false; \} \} |
- Once aquire_lock finds the matching category it will try to set an Semaphore a semaphore (Flagflag) using JSJobScheduler's inbuilt LOCK mechanism
- There is only Only one instance on of each LOCK is allowed and once the as can be seen in the screenshot below. Once a LOCK has been assigned to first file of Berlin categorya category (either Berlin or Munich), next file for this category has to wait with a setback until the LOCK is free.
TODO - ADD SCREENSHOT - The same mechanism will be repeated for files from category Munich but since the LOCK is not acquired ( or Semaphore (Flag)) is not set for Munich, file from category Munich will other categories. As long as a file of any given category is not being processed and therefore the corresponding LOCK not been set, the way will be free for the file from the other category to be allowed to be processed.
- Once process is finished depending upon success or error, JS JobScheduler will move the file from the in folder to either the done (on success) or failed(on error) folders.
- After moving input file to correct target directory JS job JobScheduler, the
release_lock
job will be called, which will remove the lock/Semaphore semaphore from JS JobScheduler and allow the next file from same category will be allowed.to be processed.
Code Block | ||
---|---|---|
| ||
function spooler_process() \{
try \{
var parameters = spooler_task.order().params();
var filePath = "" + String(parameters.value("scheduler_file_path"));
spooler_log.info( " scheduler_file_path : " + filePath );
var lockName = "" + String(parameters.value( "lock_name" ));
spooler_log.info( " lock_name : " + lockName );
/*
var fileParts = filePath.split("\\");
var fileName = fileParts[fileParts.length-1];
spooler_log.info( "fileName : " + fileName );
if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) \{
var lockName = "BERLIN_PROC";
var lock_name = "BERLIN_PROC";
spooler_log.info( "File matched with berlin lock_name : "+ lockName );
\}
if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) \{
var lockName = "MUNICH_PROC";
spooler_log.info( "File matched with berlin lock_name : "+ lockName );
\}
*/
if (spooler.locks().lock_or_null( lockName )) \{
spooler.locks().lock( lockName ).remove();
\}
return true;
\} catch (e) \{
spooler_log.warn("error occurred: " + String(e));
return false;
\}
\}
|
- A force-release_lock job is also required for the situation that the processing of a job fails without the corresponding lock being released.
No Format The script for this job to release the 'Berlin' lock would look something like:
Code Block | ||
---|---|---|
| ||
function spooler_process() \{
try \{
var lock_name = "BERLIN_PROC";
spooler.locks().lock(lock_name).remove();
return true;
\} catch (e) \{
spooler_log.warn("error occurred: " + String(e));
return false;
\}
\}
|
Limitations of this solution
- The limitation of this approach is that there is no guarantee that the files will be processed in their intended order will become apparent if more than one file should arrive from a subsidary at once.
No Format This is because there is no guarantee that files arriving together will be processed in any given order. This situation typically occurs after an unplanned loss of a file transfer connection. After the connection has been restored, there is a) no guarantee that the files arriving as a batch will be written to the file systen in a particular order b) no way for JobScheduler to know how many files will arrive as a batch.
- One solution here situation could typically occur after an unplanned loss of file transfer connection. After the connection has been restored, there is a) no guarantee that the files arriving as a batch will be written to the file systen in a particular order b) no way for JobScheduler to know how many files will arrive as a batch. One approach would be to wait until a steady state in the incoming directory has been reached (i.e. no new file has been added and the size of all files remains constant over a suitable period of time) before starting to identify files. JobScheduler could then sort order files according to their names before forwarding them for processing.
No Format The downside of this second approach is that it brings a delay in starting processing with it, due to the need wait to check whether all members of a batch of files has arrived by checking for a steady state.
Solution Demo
A demonstration of this solution is available for download from:
...
- Copy the 'SQLLoaderProc' folder to your JobScheduler 'live' folder.
No Format ...
- Copy the 'Data' folder to the a suitable local location.
No Format The default location for this folder, which is specified in the job configurationsconfigurations in the demo jobs, is: {{C:\sandbox}} ThisNote location is used in this FAQ. The that the following paths have to be modified if the location of the 'Data' folder is changed:
- The 'load_files' File Order Source directories in the
load_files.job_chain.xml
job chain object - the 'source_file' and 'target_file' paths specified as parameters in the
move_file_suc.job.xml
andmove_file_error.job.xml
objects
Running the demo
- Just copy files from the 'Data/__test-files' folder to the 'in' folder, JobScheduler will automatically start processing within a few seconds.
How does the Demo Work?
The demo job chain ('load_files') is shown in the diagram to the right.
No Format |
---|
Note that the secondary job chain shown on the right of the diagram is only relevant of an error occurs.
|
The 'Folderin_a' and 'Folderin_a' symbols represent the 'File Order Source' directory monitored by the job chain.
- JobScheduler starts as soon as file matching with Regular Expression found in in directory.
- JobScheduler's aquire_lock job matches file with regular expression and decide file's category i.e. Berlin or Munich.
- Once aquire_lock finds the matching category its try to set an Semaphore (Flag) using JobScheduler's inbuilt LOCK mechanism
- There is only onc instance on LOCK is allowed or once LOCK is assigned to first file of Berlin category, next file has to wait or setback until the LOCK is free.
- THe same mechanism will be repeated for files from category Munich but since the LOCK is not acquired ( or Semaphore (Flag)) is not set for Munich, file from category Munich will be allowed to be processed.
- Once process is finished depending upon success or error , JS will move the file from the 'in' folder to either the 'done' (on success) or 'failed' (on error).
- After moving input file to correct target directory JobScheduler job release_lock will be called which will remove the lock/Semaphore from JS and next file from same category will be allowed.
Graphical Representation of the Solution
Graphviz |
---|
digraph "Example: File Transfer using JadeJob"\{
graph [rankdir=TB]
node [shape=box,style="rounded,filled",fillcolor=aquamarine]
node [fontsize=10]
ranksep=0.3
FolderIn_a -> start
FolderIn_b -> start
start -> aquire_lock -> load_file_process -> move_file_suc -> suc_release_lock -> end_suc -> success
move_file_err -> error_release_lock -> end_err -> error
\}
|
See also: