submissionHelpers

With ever increasing amounts of data to process even a very efficient serial processing quickly results in long turn-around times. Fortunately, HEP tasks usually consist of many independent events. Hence, results can be obtained much faster by distributing the workload on a cluster comprised of many, many computers. The usage of such clusters usually requires interaction with some form of scheduler. A scheduler is a tool that receives instructions from users about what workload to run and assigns these workloads to the individual machines ('nodes') in the cluster. In this process it also needs to respect a set of resource requirements of each workload (usually called a 'job'), such as the amount of memory (RAM), (maximal) runtime, number of CPUs/cores, local disk space, ... . In general it is the users responsibility to tell the scheduler the needed resources when submitting a job. There are several different schedulers in productive use at various clusters. Examples of such schedulers and corresponding submission commands (we'll call any tool the user interacts with for the purpose of job submission a scheduler even though there might be more adequate names for some of them) such as HTCondor ('condor_submit'), torque ('qsub', sometimes also 'msub'), SGE (also 'qsub' but about 50% different arguments/syntax), slurm ('sbatch'), LSF ('bsub') and several more. The CAF comes with a set of helpers that allow for writing greatly simplified (python) scripts that work with multiple, different schedulers while requiring only minimal awareness of the user about the interface of scheduler available for a particular cluster.

Defining Tasks

To run a fairly arbitrary workload on a cluster using CAF's submissionHelpers the main user side effort is to define the workload, corresponding setup instructions, and resource requirements. In the following this is summarized as a task. In order to differentiate this definition from the actual execution on the cluster, the latter will be referred to as a 'job' in the following.

The creation of a 'task' object is shown in the following snippet together with some explanation on various parameters that can be defined for each individual task:

#!/usr/bin/env python
from CommonAnalysisHelpers import submit,task
#obtain and evaluate an argParser with the minimal configuration of commonly used submission options.
#to see the available options, run this script with the '--help' option
parser = submit.MinimalArgumentParser()
args = parser.parse_args()

#constructor of a task:
#positional arguments are: 
# - 'identifier' (string), 
# - 'payload' (string or list of strings, in case of a list the strings are concatenated with newline characters (\n) 
#optional (named) arguments:
# - args : argParse namespace, if some options are not set, the settings on this object are used as defaults/fallbacks
# - queue: choose the 'queue' or 'partition' to submit to. In general not all machines of a cluster
#               are available in every queue and queues might have restrictions on what resources
#               can be requested (e.g. max. runtime). Please check the documentation of your 
#               particular cluster for what queues are available
# - time: maximum runtime to request for this task in minutes. If the job doesn't finish within
#             this time the scheduler may forcefully terminate it.
# - memory: amount of RAM to reserve in MB. The exact definition of this quantity may vary (RSS, VSS,...)
# - setup: prepended to payload, must start with a 'shebang' (#!myInterpreter) and should provide setup 
#              instructions to prepare the environment for the actual payload
# - outputs: (string or list of strings) paths and names of files the job is expected to produce. A task will only be considered as being
#                  successfully completed ('done') if all of these files are present
# - inputs: like outputs but listing (config) files the payload reads. If specific options are set this allows to 
#               automatically consider outputs as invalid (as if not existing w.r.t. the task being 'done') if they are 
#               older than the newest input file (based on the files timestamps).
# - ncores: number of CPU cores to request
# - logFile: file that the jobs stdout will be written to
# - errFile: file that the jobs stderr will be written to
# - dependencies: list of other task object that need to be 'done' before this job may start to run
t = task.task("testJob1","touch testFile1.txt\necho 'Hello Batch'\nsleep 20s",args=args,queue="express",time="1",memory=500.,setup="#!/usr/bin/env bash", outputs="testFile1.txt")

This example does not have any effect on the computing cluster yet (except for a small amount of computing time on the interactive/login node it is run on). However, this already allows to define a large amount of work to be done in a convenient format. An (almost) arbitrarily complex graph of tasks depending on each other can either be created by specifying the dependencies of each task during its creation (see above) or by announcing the dependencies later:

#announce 'otherTask' as a dependency of 'someTask'
someTask.addDependency(otherTask)

If, at any point one wants to know what a task object looks like, one can simply print it:

Image removed.
Printing a task object in an interactive python session

SubmissionControllers

Once all tasks are defined as well as their dependency on each other one simply passes a list of terminal-node tasks to a SubmissionController. A terminal-node task is a task that no other task depends on. In simpler terms: these are the tasks producing what we are ultimately interested in as the final product(s). The SubmissionController then takes care of all the set dependencies automatically. A minimal example could look like follows (please note that in this example some arguments were hard coded; on your own cluster you would likely need to exchange the queue name):

#!/usr/bin/env python2

from CommonAnalysisHelpers import submit,task
#get a dummy arg parser, it is required by several components
parser = submit.MinimalArgumentParser()
args = parser.parse_args()

terminalTask = task.task("testJob1","touch testFile1.txt\necho 'Hello Batch'\nsleep 20s",args=args,queue="express",time="1",memory=500.,setup="#!/usr/bin/env bash", outputs="testFile1.txt")
dependencyTask = task.task("testJob2","touch testFile2.txt\necho 'Hello Batch from depedency'\nsleep 20s",args=args,queue="express",time="1",memory=500.,setup="#!/usr/bin/env bash", outputs="testFile2.txt")
#announce the dependency:
terminalTask.addDependency(dependencyTask)

#automatically determine which controller should be used for the current system:
ctrl = submit.guessSubmissionController()
#pass a list of terminal-node tasks to the controller
ctrl.submitTasks(args,[terminalTask])

Once the jobs corresponding to these two tasks have run there should be two files in the directory where this example was run: testFile1.txt and testFile2.txt, both empty. The presence of these files, however, does have an effect if the example is executed again. Whenever passing tasks to the SubmissionController it performs some checks if the tasks should actually be submitted. This is not the case if

  • there is already a job with the same name/identifier running or scheduled, or
  • all outputs the job is expected to produce are already present

If the script is called with the '--checktimestamp' argument (that is if the 'args' namespace object has this option set) the check for the presence of output files additionally takes into account if they are newer than any listed input file and will only consider the output file as valid ("present") if it is newer than any input file. If no input files are listed, this option makes no difference.

If a task is neither already known to the scheduler nor done, then all tasks depending on this one will also be considered as being 'tosubmit'.

Checking on job/task status

A frequent desire is to check on the status of the submitted jobs/tasks:

  • Are they still pending execution/running?
  • Are they done?
  • Did they fail/crash/get terminated?

While the provided tools often cannot tell for sure if a job has failed, they can provide some helpful hints. If one has already submitted all task/jobs at some point and at a later time they are neither found in the scheduler's queue nor are their expected outputs present this is usually a strong indicator for a failed job. In this case one should consult the respective log file. In this case one also does not want to directly re-submit these jobs but rather resolve some issues that lead to the failure. Plainly re-running the submission script, however, would (in most cases) simply submit them again.

Hence, the CAF SubmissionControllers come with support for the '--checkmissing' argument. It inhibits any actual submission but instead only gathers some information about the jobs/tasks status and lists them in two groups of tasks that are done and tasks that are not (yet) done. If a task is already 'done' its dependencies are implicitly done as well and hence omitted in the printout. If multiple tasks depend on a task that is not yet done the dependency task is only printed for the first task depending on it, for all subsequent tasks depending on it only '(...)' is printed once instead of all dependencies that are not yet done (unless that particular dependency has not been printed before).

Image removed.
Example printout with '--checkmissing' option
Image removed.
Another example printout with '--checkmissing', one task is still to be submitted while its dependency is already done.

Adding a new submissionController

If for your cluster there is no suitable submissionController available yet a new one can be implemented by simply writing a (python) class inheriting from submissionControllerBase. The base class implements most of the complex logic (which generally does not need to be modified). The derived class is only expected to provide its own implementation of all methods that, in the base class, raise a 'NotImplementedError'. In the following some hints regarding these methods are given.

  • '_apply_*' methods These methods are expected to manipulate the 'cmd_args' and 'payload' lists to include the respective option if configured on the provided task object.
    • The 'base_command' is usually the name of the submission binary (examples: bsub (LSF), sbatch (slurm), qsub (torque/SGE) ) and possibly some additional arguments, e.g., to ensure the output from executing the submission binary has an easily parsable format.
    • Every part of the final command that, when typing it out manually on a console, would be separated by a space should be one entry in the 'cmd_args' list.
    • For log and error files one needs to pay attention to how the scheduler in question expects to find options when, e.g., log and error file are the same (stderr and stdout should go to the same file).
    • When adding dependencies for a job, iterate over the tasks 'dependencies' member (list of task objects). For some systems dependencies must be declared using a jobID assigned by the scheduler upon submission of a task/job. If a task has been submitted or has been identified in the current queue, its jobID for this purpose can be obtained via the method 'task.getJobID(args,ctrl)' where 'args' is the argparser namespace and and 'ctrl' the submissionController instance. Typically this looks like the following:
      for dep in thisTask.dependencies:
        jid = dep.getJobID(args,self)
        #modify cmd_args and/or payload to incorporate the dependency

      A more complete and less sterile example can be found, for example, in the slurmController

    • '_apply_dependencies' only needs to be implemented if the scheduler support "simple" dependencies (see below).

  • '_handle_submit_response' This method is expected to do the post-processing when a task has been submitted. It determines if the task has been successfully submitted or not (e.g., an error occurred) by checking the 'stdout', 'stderr' strings, and the status_code (the exit code of the submission binary, typically 0 if everything is fine and non-0 otherwise). It is also responsible for storing additional information on the task object like a jobID returned by the submission binary (usually part of the 'stdout' string).

  • '_is_available' This static method (the line before 'def _is_available():' should read '@staticmethod') determines if this controller can be used on the current system. It is expected to return True if it corresponding scheduler is available and False if not. Unless there are ambiguities (like with torque vs SGE) this can usually be tested by checking if the submission binary is available, e.g.:

    @staticmethod
    def _is_available():
      #prepare to execute 'which sbatch'
      p = subprocess.Popen("which sbatch".split(), stdin=subprocess.PIPE,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
      #execute the prepared command
      p.communicate("") 
      #evaluate its return code    
      return not p.returncode #returncode is 0 if everything went fine!
  • '_supports_simple_dependencies_' This method is expected to return True if the scheduler in question supports "simple" dependencies. Simple in this context means that we can provide a list of already running jobs (by jobId, name or similar) that a task to be submitted depends on. If this is not the case like for HTCondor, where a fully dependency graph needs to be supplied at once, this method should return False. In this case the implementation of the base class will emulate the dependency support and '_handle_dependencies' does not need to be implemented.

  •  

    '_updateQueueStatus' This method should update the member queueStatusCache by polling the scheduler for currently running/scheduled jobs (of the current user). The queueStatusCache is expected to be a dictionary ('dict') with tasks identifiers (not jobIDs!) as keys. The values should be dicts themselves with the keys (values)

    • 'jobid' (scheduler internal ID, corresponding to what one expects from task.getJobID)

    • 'status' (arbitrary info string that might tell the user more about the stage of the job)

    • 'statusCode' (one of the enumeration members of taskStatus, here typically on of 'taskStatus.running', 'taskStatus.submitted', 'taskStatus.failed' or (if something unexpected happens 'taskStatus.undefined')

  • If the queue polling did not yields any running jobs, the queueStatusCache should be an empty dict. At the end of the '_updateQueueStats' method the internal time stamp needs to be updated:
    self.queueStatusTimestamp = time.time()
  •  

When adding the new submissionController directly into the CAFCore containing already the other ones it should be imported to the CommonAnalysisHelpers.submissionControllers scope by adding a line for it in the '__init__.py' file in 'CAFCore/CommonAnalysisHelpers/python/submissionControllers'.

Please remember to re-run cafcompile to make sure your new implementation can be found by python in your submission script.

CAFExample's submit.py

CAFExample comes with a basic submission script that makes use of the features mentioned above: 'submit.py' (inside 'share'). Like the other scripts in the same directory is can be called with the '--help' option to print all command line options the script accepts. A typical call looks like the following

submit.py flatNTuple/config/master/analyze-flatNTuple-Example.cfg --jobs\
flatNTuple/config/jobLists/jobs_flatNTuple_analyze.txt --identifier CAFExTest --allowArgChanges

(Please note: a trailing \ at the end of a line is interpreted, e.g., by bash as a 'line-continuation character', that is the next line is treated as if it followed without a line-break).

What this script does internally is to read the master config file intended for analyze.py and extract the location of the 'initialized' SampleFolder. It then reads that SampleFolder as well as a job-definition file ('--jobs') and determines how to split the workload into tasks which are then submitted.

The job-definition file is simply a list of (wildcarded) paths inside the SampleFolder that should be processed, each line defining one job/task in the simplest case, possibly including wildcards. Additionally, 'modifiedLines' can be used, that is lines of the form

!memory=2000

which will overwrite settings with the same name from the  command line for all jobs listed below (or until the same option is changed again). The command line option corresponding to this example is '--memory'. For modifierLines to take effect the '--allowArgChanges' option must be specified! Lines starting with a # sign are considered as comments and ignored.

The command-line options '--maxSampleSize' and '--maxSampleCount' (or the equivalent modifierLines) instruct the script to split each job definition further into sub-jobs such that they only process up to a given number of input files (maxSampleCount) and/or the combined size of the input files per sub-job is below the specified value (maxSampleSize, in MB).

The submit.py script (like any submissionHelpers based script that creates a list of tasks in a deterministic, reproducible way) can be called again and detects automatically which tasks are already submitted or even done. To make the task list reproducible the '--identifier' option needs to be set as otherwise the current time is used as a fall back (which is different between two calls). Particularly helpful is to pass the '--checkmissing' option which prevents actual submission but only checks the status of all tasks, listing if they are still being processed ('running'/'submitted'), 'done' or still to be submitted ('tosubmit'). Please note that 'tosubmit' might mean that the jobs have crashed or were forcefully terminated by the scheduler if they have already been submitted before. In this case one should make sure to check the corresponding log files and fix the underlying issue before calling submit.py again without '--checkmissing'.

Merging SampleFolders

When submitting, e.g., the main analysis step (analyze.py) to a cluster one obtains (at least) one output file per task/job. Since the idea of SampleFolders includes having them as a single analysis-representing object an additional step is required to turn multiple (partial) SampleFolders into a single one. To this end CAFCore comes with a ready made tool: tqmerge. As almost all CAF tools it makes use of the python 'argparser' and therefore supports being called with the '--help' option. The typical usage of tqmerge reads

tqmerge -o myMergedSampleFolder.root -t analyze --patch someTQFolderPatchFileToApplyToEachInputBeforeMerging.txt path/to/unmergedOutputsA/*.root path/to/unmergedOutputsB/*.root

This example merges all SamplesFolders in the files we list in the end (please note that bash actually already expands the wildcarded paths, hence, tqmerge actually sees each individual file path). Upon loading the first (partial) SampleFolder and before merging each subsequent one with it one can apply some patches to the respective SampleFolder with the '--patch' option, e.g. to remove some unneeded elements of the SampleFolder to produce a smaller output in the end. For the merging mechanism to know which parts of each partial SampleFolder are actually relevant (that is which parts were actually worked on in the respective job) a 'trace ID' needs to be provided. Typically this is the name of the payload script that was executed in the job. In the above example this is 'analyze'. Finally, we need to provide the file name that the merged SampleFolder should be written to via the '-o' option.