Table of Contents | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
Introduction
Consider the situation where files with different categories (i.e. files with similar characteristics such as type, source, or similar) can be processed in parallel but all the files of any one category have to be processed sequentially.
...
Our recommended approach for this situation would be:
Graphviz |
---|
digraph \{ graph [rankdir=TB] node [shape=Mrecord,style="filled",fillcolor=lightblue] node [fontsize=10] ranksep=0.3 subgraph cluster_0 \{ style=filled; color=lightgrey; node [style=filled,color=white]; label = "JobScheduler"; labeljust = "l"; pad = 0.1; FileOrderSource[label= "\{<f0> FileOrder\nSource|<f1>Identify\ Category|<f2>Is\ Lock\ Set|\{<f3>Yes|<f4>No\}\}"]; Setback[label= "\{<f0>Setback\nWait\}"]; StartProcessing[label= "\{<f0>Set\ Lock|Start\ Processing\ File\}"]; EndProcessing[label= "\{<f0>End\ Processing\ File|<f1>Release\ Lock\}"]; FileOrderSource:f3 -> Setback -> FileOrderSource:f2; FileOrderSource:f4 -> StartProcessing -> EndProcessing [weight=4]; \} File_Transfer -> FileOrderSource; \} |
- The most important feature of this solution is that it only requires one job chain and one set of jobs to seperate the processing of different categories of files, thereby sinplifying maintenance of the jobs and job chain.
- Use a single 'load_file' File Order Source directory to which all files are delivered.
- JobScheduler would use regular expressions to identify the files arriving in this directory on the basis of their names, timestamps or file extensions and forward them for processing accordingly.
JobScheduler would then set a 'lock' for the subsidiary whose file is being processed to prevent further files from this subsidiary being processed as long as processing continues.
noformatShould
a
'new'
file
from
this
subsidiary
arrive
whilst
its
predecessor
is
being
processed,
the
job
'receiving'
the
new
file
will
be
'set
back'
by
JobScheduler
as
long
as
the
lock
for
the
subsidiary
is
set.
- This lock would be released by JobScheduler once processing of the 'first' file has been completed.
- The job 'receiving' the new file will now be able to forward the new file for processing.
The Solution in Detail
JS starts as soon as file matching with Regular Expression found in the directory.
noformatThis
directory
is
set
in
the
"File
Order
Sources"
area
in
the
"Steps/Nodes"
view
of
the
"load_files"
job
chain
as
shown
in
the
screenshot
below:
- JobScheduler's
aquire_lock
job matches files using regular expressions and determines the file's category - for example, Berlin or Munich.noformat
The
regular
expressions
are
also
defined
in
the
"File
Order
Sources"
area
shown
in
the
screenshot.
Note
that
for
simplicity
the
regular
expressions
match
the
prefixes
"a"
and
"b"
in
the
file
names
and
not
directly
"Berlin"
or
"Munich".
The
{{
}}aquire_lock
job
uses
a
Rhino
JavaScript
to
try
to
aquire
the
lock
and
either
wait
if
the
lock
is
not
available
or
proceed
to
the
next
node
if
it
is.
Thisscript
is
listed
below:
Code Block language javascript
...
function spooler_process()
...
{ try
...
{ var parameters = spooler_task.order().params(); var filePath = "" + String(parameters.value("scheduler_file_path")); spooler_log.info( " scheduler_file_path : " + filePath ); // var param1 = "" + String(parameters.value("param1")); spooler_log.info( " param1 : " + param1 ); // var fileParts = filePath.split("\\"); var fileName = fileParts[fileParts.length-1]; spooler_log.info( "fileName : " + fileName ); if(fileName.match("^a[A-Za-z0-9_]*\.csv$"))
...
{ var lockName = "BERLIN_PROC"; var lock_name = "BERLIN_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName );
...
} if(fileName.match("^b[A-Za-z0-9_]*\.csv$"))
...
{ var lockName = "MUNICH_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName );
...
} spooler_task.order().params().set_value("file_spec",fileName); spooler_task.order().params().set_value("lock_name",lockName); if (!spooler.locks().lock_or_null( lockName ))
...
{ var lock = spooler.locks().create_lock(); lock.set_name( lockName ); spooler.locks().add_lock( lock ); if (spooler_task.try_hold_lock( lockName ))
...
{ return true;
...
} else
...
{ spooler_task.call_me_again_when_locks_available();
...
}
...
} else
...
{ spooler_task.order().setback(); spooler_log.info( " lock is already aquired , lock_name : "+ lockName +" , setback , will retry after some time");
...
} return true;
...
} catch (e)
...
{ spooler_log.warn("error occurred : " + String(e)); return false;
...
}
...
}
- Once
aquire_lock
finds the matching category it will try to set a semaphore (flag) using JobScheduler's inbuilt LOCK mechanism - Only one instance of each LOCK is allowed as can be seen in
Max Non Exclusive
parameter shown in the screenshot of JobScheduler's JOE interface below:
- Once a LOCK has been assigned to a file from a category (either Berlin or Munich), all subsequent files for this category haves to wait with a setback until the LOCK has been freed.
The same mechanism will be repeated for files from other categories. As long as a file of any given category is not being processed and therefore the corresponding LOCK not been set, the way will be free for the file from the other category to be allowed to be processed.
noformatThis
can
be
seen
in
the
following
screenshot
of
the
JOC
interface
showing
the
progression
of
file
orders
along
the
{{
}}load_files
job
chain
.:
- Once process is finished depending upon success or error, JobScheduler will move the file from the in folder to either the done (on success) or failed (on error) folders.
- After moving input file to correct target directory JobScheduler, the
release_lock
job will be called, which will remove the lock/semaphore from JobScheduler and allow the next file from same category to be processed.
...
Code Block | ||
---|---|---|
| ||
function spooler_process() \{ try \{ var parameters = spooler_task.order().params(); var filePath = "" + String(parameters.value("scheduler_file_path")); spooler_log.info( " scheduler_file_path : " + filePath ); var lockName = "" + String(parameters.value( "lock_name" )); spooler_log.info( " lock_name : " + lockName ); /* var fileParts = filePath.split("\\"); var fileName = fileParts[fileParts.length-1]; spooler_log.info( "fileName : " + fileName ); if(fileName.match("^a[A-Za-z0-9_]*\.csv$")) \{ var lockName = "BERLIN_PROC"; var lock_name = "BERLIN_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName ); \} if(fileName.match("^b[A-Za-z0-9_]*\.csv$")) \{ var lockName = "MUNICH_PROC"; spooler_log.info( "File matched with berlin lock_name : "+ lockName ); \} */ if (spooler.locks().lock_or_null( lockName )) \{ spooler.locks().lock( lockName ).remove(); \} return true; \} catch (e) \{ spooler_log.warn("error occurred: " + String(e)); return false; \} \} |
A job to force
release_lock
is also required for the situation that the processing of a job fails without the corresponding lock being released.No Format The script for this job to release the 'Berlin' lock would look something like:
Code Block | ||
---|---|---|
| ||
function spooler_process() \{ try \{ var lock_name = "BERLIN_PROC"; spooler.locks().lock(lock_name).remove(); return return truetrue; \} catch (e) \{ spooler_log.warn("error occurred: " + String(e)); return false; \} \} |
Limitations of this solution
The limitation of this approach will become apparent if more than one file should arrive from a subsidary at once.
noformatThis
is
because
there
is
no
guarantee
that
files
arriving
together
will
be
processed
in
any
given
order.
This
situation
typically
occurs
after
an
unplanned
loss
of
a
file
transfer
connection.
After
the
connection
has
been
restored,
there
is
a)
no
guarantee
that
the
files
arriving
as
a
batch
will
be
written
to
the
file
systen
in
a
particular
order
b)
no
way
for
JobScheduler
to
know
how
many
files
will
arrive
as
a
batch.
One solution here would be to wait until a steady state in the incoming directory has been reached (i.e. no new file has been added and the size of all files remains constant over a suitable period of time) before starting to identify files. JobScheduler could then order files according to their names before forwarding them for processing.
noformatThe
downside
of
this
second
approach
is
that
it
brings
a
delay
in
starting
processing
with
it,
due
to
the
need
wait
to
check
whether
all
members
of
a
batch
of
files
has
arrived
by
checking
for
a
steady
state.
Solution Demo
A demonstration of this solution is available for download from:
...
- Unpack the zip file to a local directory
Copy the 'SQLLoaderProc' folder to your JobScheduler 'live' folder.
No Format TODO - ADD LINK TO DOKU
Copy the 'Data' folder to the a suitable local location.
noformatThe
default
location
for
this
folder,
which
is
specified
in
the
configurations
in
the
demo
jobs,
is:
{{
}}
C:\sandbox
Notethat
the
following
paths
have
to
be
modified
if
the
location
of
the
'Data'
folder
is
changed:
- The 'load_files' File Order Source directories in the
load_files.job_chain.xml
job chain object - the 'source_file' and 'target_file' paths specified as parameters in the
move_file_suc.job.xml
andmove_file_error.job.xml
objects
- The 'load_files' File Order Source directories in the
Running the demo
- Just copy files from the 'Data/__test-files' folder to the 'in' folder:
- JobScheduler will automatically start processing within a few seconds
- Once processing has been completed the file(s) added to the 'in' folder will be moved to the 'done' or 'failed' folders, depending on whether processing was successful or not.
- DO NOT attempt to start an order for the job chain. This will only cause an error in the
aquire_lock
job.
How does the Demo Work?
{{DiagramBoxRight
BoxTitlh1. The BoxTitle=The demo 'load_files' job chain | ||
BoxContent
}} |
The demo job chain ('load_files') is shown in the diagram to the right.
...
TODO
...
-
...
ADD
...
SCREENSHOT
...
OF
...
JOB
...
CHAIN
...
IN
...
JOE
...
The
...
'Folderin_a'
...
and
...
'Folderin_b'
...
symbols
...
represent
...
the
...
'File
...
Order
...
Source'
...
directory
...
monitored
...
by
...
the
...
'load_files'
...
job
...
chain.
...
The
...
remaining
...
jobs
...
in
...
the
...
chain
...
shown
...
the
...
individual
...
chain
...
nodes
...
shown
...
the
...
job
...
chain
...
'Steps/Nodes'
...
form.
...
Note
...
that
...
the
...
secondary
...
job
...
chain
...
shown
...
the
...
behaviour
...
in
...
the
...
event
...
of
...
an
...
error.
...
- JobScheduler starts as soon as file matching with Regular Expression found in the File Order Source directory.
- JobScheduler's
aquire_lock
job matches file with regular expression and decide file's category i.e. Berlin or Munich. - Once aquire_lock finds the matching category its try to set an Semaphore (Flag) using JobScheduler's inbuilt LOCK mechanism
- There is only one instance on LOCK is allowed or once LOCK is assigned to first file of Berlin category, next file has to wait or setback until the LOCK is free.
- The same mechanism will be repeated for files from category Munich but since the LOCK is not acquired ( or Semaphore (Flag)) is not set for Munich, file from category Munich will be allowed to be processed.
- Once process is finished depending upon success or error , JS will move the file from the 'in' folder to either the 'done' (on success) or 'failed' (on error).
- After moving input file to correct target directory JobScheduler job release_lock will be called which will remove the lock/Semaphore from JS and next file from same category will be allowed.
...