Jobmanagement (jobManagement
)¶
JobManagement provides a set of classes to executes different types of jobs on a workstation or a cluster. It needs a bash environment on the workstation and PBS(portable bash system) on the cluster. It consists on a central job list class which serves as controller for the jobmanagement. A job list instance owns a graph which displays the dependencies between the jobs in the list and to resources instances which manages the number of processors and nodes to use. The job class is build as factory pattern which creates instances of the job you need in dependency of your needs. You can put a job list in a job list if you want to, but for the correct handling of the resources you need to pass the resources instances of the root job list. For every job a output file will be created automatically, when no path is given.
To work on a cluster you need a ASCII file /etc/CLUSTERNAME containing the name of the cluster or an environment variable CLUSTERNAME with the name of the cluster.
Usage¶
import:
from mojo.jobManagement.jobs.jobList import JobList
from mojo.jobManagement.jobs.job import Job
Simple example with three jobs with easy dependencies:
myJobList = Joblist(nNodesAvailable = 0, nProcsAvailable = 6) # We want to compute local and use six processors
job1 = Job("<path to a cleanup script>")
job2 = Job("<path to a TRACE executable>", nProcs = 4, args = ["<a argument>", "<another argument>"])
job3 = Job("<path to a POST executable>", args = ["<a argument>", "<another argument>"])
job4 = Job("<path to a analysis script>")
myJobList.addJob(job1)
myJoblist.addJob(job2, [job1.id]) # pass a list with the parents ids to get the dependencies right
myJobList.addJob(job3, [job2.id])
myJobList.addJob(job4, [job3.id])
myJobList.viewJobs() # if you want to
myJobList.runJoblist()
myJobList.cleanup() # if you want to; removes the output files of all successful jobs
If we were on a cluster we could do the same, but for job1 and job4 we would pass executeOnMaster = True to not allocate a own node for the small scripts. We would pass a number of nodes we wanted to use at the same time.
A other example with a job list in a job list:
myJobList = Joblist(nNodesAvailable = 20, nProcsAvailable = 2, name = myJoblist) # a name for the joblist
job1 = Job("<path to a cleanup script>")
myJobList.addJob(job1)
# passing the management
ajoblist = JobList(nodeManagement = myJobList.nodeManagement, procManagement = myJoblist.procManagement)
job2 = Job(..., executeOnMaster = True) # if you are on a cluster the job will be executed on the master
job3 = Job(print, args = ["Hello World!"]) # a MethodJob
ajoblist.addJob(job2)
ajoblist.addJob(job3, [job2.id])
myJobList.addJob(ajoblist, [job1.id])
A example executing a single job:
myJob = Job("<path to executable>", runInBackground = False)
myJob.runJob()
myJob.cleanup(False) # removes the output file even if the job failed; the optional False is available for
# the joblist too
Running a MethodJob and receiving the result via a instance variable:
def myMethod(a, b):
return a + b
myJob = Job(myMethod, args = [42, 3141], runInBackground = False)
myJob.runJob()
print myJob.returnValue
Attention
You will only get pickable return values!
For all available joblist or job options please view the further documentation at the bottom.
Attention
Set runInBackground to false only if you want to execute the job without a job list
Modules¶
JobList¶
Class to set up a job management system for among themself dependent jobs. In addition you can put a Job list itself as job into another job list. After a successful execution of the job list you can get a list of the jobs which were successful, failed and were not started because of failed parent jobs by accessing joblist.successfulJobs, joblist.failedJobs and joblist.jobs.values().
Attention
Declaring dependencies between jobs in different job lists will generate a error!
- class
JobList
(nNodesAvailable=None, nProcsAvailable=None, nTecplotLicenses=None, abortByError=False, name='anonymousJoblist', verbosity=<Verbosity.DEFAULT: 2>, resourcesDict=None, usedResources=None, additionalProcessStatus=False, retries=0, deactivateJobGrouping=True)[source]¶Class to set up a system of among themselves dependent jobs. JobList uses a instance of graph to control the dependencies between the jobs and a instance of resources to manage the nodes available.
- Parameters
nNodesAvailable (integer) – number of nodes available for the execution of the job list
nProcsAvailable (integer) – number of processors to use on the master
nTecplotLicenses (integer) – number of tecplot licenses needed by the job
abortByError (boolean) – flag if the whole list should became stopped when one job fails
name (string) – a name for the job list
resourcesDict (dictionary) – a dictionary containing resources available in the job list (key: resource, value: quantity of the resource)
verbosity (integer) – verbosity level of the job list
usedResources (dictionary) – resources the job list consumes itself when processing
retries (integer) – number of times the job is retried when failing
deactivateJobGrouping (boolean) – disable running several small jobs simultaneously on cluster nodes
Inherited from AbstractJob:
start(self) _writeStatus(self, status, message="")
addJob
(job, parents=None, weight=None)[source]¶Adds a new job to the job list and builds the dependencies
- Parameters
job (instance of a class derived from AbstractJob) – the job to add to the job list
parents (list of int) – list of the parentsIds of the job
weight (int) – weight of the job in processes * seconds
- property
allJobs
¶Returns all jobs in this JobList
Warning: Some jobs can be included multiple times in the returned iterator
nJobsOfStatus
(statusList=None)[source]¶Returns if the jobs status is in the list of status.
Overwrites method in AbstractJob.
- Parameters
statusList (Optional[iterable]) – list of valid status or None
- Returns
number of jobs of this status
- Return type
runJoblist
(printOverview=False)[source]¶Executes a job list.
- While not terminated jobs or jobs without dead parent in the list:
when jobs ready to start exist and enough nodes exist start new jobs
print the status of all running jobs into the log file and to sys.stdout
if a job has the status done or error free the nodes and resolve the dependencies
stop
()[source]¶Stops all currently running jobs. Called by a parent job list or through a keyboard interrupt or system exit.
Job¶
Job supplies a class to compute a trace-, prep- or post job, on a workstation or a cluster. When creating an instance in dependency on parameters passed a suitable class will be chosen and initiated.
- Assumptions:
Job scheduling on cluster is managed by PBS (Portable Batch System) or SLURM.
Requires:¶
- for use on a cluster:
Information about the cluster has to be added in ‘mojo.jobManagement.management.clusterInfo._supportedClusters’
- class
Job
(executable, nProcs=1, args=None, kwargs=None, nTecplotLicenses=0, queue=None, workingDirectory=None, outputFile=None, jobName=None, procsPerNode=None, threadsPerProc=None, envVars=None, runInBackground=False, nodeType='ib', executeOnMaster=False, jobType=None, outputDir=None, successStatements=None, failStatements=None, ignoreExitCode=False, resourcesDict=None, useMpirun=True, wallTime=None, weight=0, dependJobID=None, account=None, retries=0, group=None, mailAddress=None, slurmSwitches=None)[source]¶A class to create the job you need by analyzing the given parameters
- Parameters
executable (str or method) – path to the executable or a method
nProcs (int) – number of processors to use
args (list) – arguments which will be appended when executing
kwargs (dict) – a variable to pass keyword arguments to a methodJob
nTecplotLicenses (int) – number of tecplot licenses needed by the job
queue (str) – queue to put the job in
workingDirectory (str) – directory to change to before executing
outputFile (str) – path to a file for the output
jobName (str) – name for the job
procsPerNode (int) – process per node
threadsPerProc (int) – threads to start per processor
envVars (dict) – environment variables to set
lock (Lock instance) – DEPRECATED; exists for compatibility reasons
runInBackground (bool) – if False the job, when started as single job, will stay in the foreground
nodeType (str) – type of the node to calculate on
executeOnMaster (bool) – if True the job will be executed on the master if on a cluster
jobType (str) – possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay
outputDir (str) – directory the output file will be saved in
successStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for success
failStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for failure
ignoreExitCode (bool) – if True the job will ignore the exit code of the executable
resourcesDict (dict) – a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
useMpirun (bool) – if False the job will not be executed with MPI but the number of processes is used
wallTime (str) – maximum wall time for a job on the cluster
account (str) – account on which computation costs are charged
retries (int) – number of times the job is retried when failing
mailAddress (str) – mail address for status information (only used for SLURM jobs!)
slurmSwitches (str) – slurm option requesting minimal number of network switches
- Returns
A job; MethodJob, ShellJob, QSubJobInstance or slurmJobInstance in dependency on the given parameters
- Return type
MethodJob/ShellJob/QsubJob/SlurmJob instance
AbstractJob¶
AbstractJob offers a class to inherit from for every type of job the job management is intended to work with. It has the abstract methods runJob() and stop() implemented to be overwritten in every derived class. AbstractJob inherits from Process and therefore can be started in a own process. Furthermore five int-constants for the statuses and a dictionary to print a string instead of the value of the constant are offered.
Required classes and methods:
from abc import ABCMeta, abstractmethod import collections import glob import os import re import sys import time from jobManagement.management.idGenerator import IdGenerator
- class
AbstractJob
(name=None, nTecplotLicenses=0, outFile=None, outputDir=None, successStatements=None, failStatements=None, workingDir=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, weight=0, retries=0, group=None)[source]¶Class every job should inherit from. Offers several methods and properties to communicate between the processes. It gets a id from IdGenerator, a queue to send the statuses to the main-thread, a name and a path to a log file(to a default log file if no log file is given).
- Parameters
name (string) – name for the new job
nTecplotLicenses – number of tecplot licenses needed by the job
outFile (string) – path to the file the output will be written in
outputDir (string) – directory the output will be written in
successStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for success
failStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for failure
workingDir (string) – directory the job will be processed in
runInBackground (boolean) – If true the job will run in the background, when executed as single job
ignoreExitCode (boolean) – if True the job will ignore the exit code of the executable
resourcesDict (dictionary) – a dictionary containing resources needed by the job (key: resource(valid dictionary key), value: quantity of the resource)
weight (int) – the weight (seconds to run times number of processes) of the job
- Tpye nTecplotLicenses
integer
cleanup
(force=False, **kwargs)[source]¶Removes the output file of the job.
If the output file is not specified explicitly nothing is written to the file with exception of the qsub job. Therefore empty output files will be deleted.
- Parameters
force (boolean) – forces the removal of the output file
getJobDictForJSON
(jobsListName)[source]¶Generates a dictionary with basic parameters to launch the job.
lookForSuccess
(returnCode=0)[source]¶Controls the success of the currently finished job. If a success statement or a fail statement was defined the output file is searched for them. Sets the status of the job.
- Parameters
returnCode (integer) – return code of the program called
- Returns
True if the job was successful
- Return type
boolean
resourcesAvailable
()[source]¶Tests if the resources for the job are available.
- Returns
True if the resources for the job are available
- Return type
boolean
- abstract
runJob
()[source]¶Abstract method runJob, to be overwritten in subclasses. Shoult initialize and start the current job.
- abstract
stop
()[source]¶Abstract method stop, to be overwritten in subclasses. Should stop the current job.
shellJob¶
ShellJob supplies a class to execute a bash script in the current shell. You have to pass a executable and can pass arguments for the executable, a name, a file the output will be saved in, a working directory and environment variables to be set before the execution.
- class
ShellJob
(executable, args=None, nTecplotLicenses=0, nProcs=None, name=None, outFile=None, workingDir=None, envVars=None, successStatements=None, failStatements=None, outputDir=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, useMpirun=False, weight=129600, retries=0, threadsPerProc=None)[source]¶Class which executes a shell script in a new process. Name and output path will be generated when not given.
- Parameters
executable (string) – path to the executable
args (list of string) – arguments which will be appended when executing
nTecplotLicenses – number of tecplot licenses needed by the job
name (string) – name for the job
outFile (string) – path to a file for the output
workingDir (string) – directory to change to before executing
envVars (dictionary) – environment variables to set
successStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for success
failStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for failure
outputDir (string) – directory the output file will be written to
runInBackground (boolean) – If true the job will run in the background, when executed as single job
ignoreExitCode – if True the job will ignore the exit code of the executable
resourcesDict (dictionary) – a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
useMpirun (boolean) – True, if mpirun should used to start the executable, false otherwise
weight (int) – the weight (seconds to run times number of processes) of the job
retries (int) – number of times the job is retried when failing
- Tpye nTecplotLicenses
integer
Methods in use inherited from AbstractJob:
_writeStatus(status, message)MethodJob¶
MethodJob supplies a class to create a job which executes a given method in the currently running python enviroment. To get the results of the method you have to pass a queue or something similar to send data between the processes.
- class
MethodJob
(method, args=None, kwargs=None, nTecplotLicenses=0, name=None, workingDir=None, outFile=None, envVars=None, successStatements=None, outputDir=None, failStatements=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, weight=60, retries=0)[source]¶Class to execute a given method with the given arguments.
- Parameters
method (method) – method to execute
args (list) – arguments which will be appended when executing
kwargs (dictionary) – a variable to pass keyword arguments to a methodJob
nTecplotLicenses – number of tecplot licenses needed by the job
workingDirectory (string) – directory to change to before executing
outFile (string) – path to a file for the output
jobName (string) – name for the job
envVars (dictionary) – environment variables to set
nodeType (string) – type of the node to calculate on
successStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for success
failStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for failure
outputDir (string) – directory the output file will be written to
runInBackground (boolean) – If true the job will run in the background, when executed as single job
ignoreExitCode (boolean) – if True the job will ignore the exit code of the executable
resourcesDict (dictionary) – a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
weight (int) – the weight (seconds to run times number of processes) of the job
retries (int) – number of times the job is retried when failing
- Tpye nTecplotLicenses
integer
QSubJob¶
QSubJob supplies a class which inherits from ShellJob and Clusterinfo. The functionality is nearly the same as in ShellJob but the job will be executed on nodes and not on the workstation or the master.
- class
QSubJob
(executable, nProcs=1, args=None, nTecplotLicenses=0, name=None, outFile=None, workingDir=None, envVars=None, threadsPerProc=None, procsPerNode=None, queue=None, nodeType='w', successStatements=None, outputDir=None, failStatements=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, useMpirun=False, wallTime=None, weight=129600, dependJobID=None, retries=0)[source]¶Class to start a job with pThreads and mpi on nodes on the cluster.
- Parameters
executable (str) – path to the executable
nProcs (int) – number of processors to use
args (list) – arguments which will be appended when executing
nTecplotLicenses – number of tecplot licenses needed by the job
name (str) – name for the job
outFile (str) – path to a file for the output
workingDir (str) – directory to change to before executing
envVars (dict) – environment variables to set
threadsPerProc (int) – threads to start per processor
procsPerNode (int) – process per node
queue (str) – queue to put the job in
nodeType (str) – type of the node to calculate on
successStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for success
failStatements (list) – list of regular expressions the output file will be searched for (line by line) to check for failure
outputDir (str) – directory the output file will be written to
runInBackground (bool) – If true the job will run in the background, when executed as single job
ignoreExitCode (bool) – if True the job will ignore the exit code of the executable
resourcesDict (dict) – a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
wallTime (str) – maximum wall time for a job on the cluster
weight (int) – the weight (seconds to run times number of processes) of the job
retries (int) – number of times the job is retried when failing
- Tpye nTecplotLicenses
int
Graph¶
The class Graph serves as root for a graph which shows the dependencies between several jobs. In addition to the methods which are offered by node.py the user gains the ability to add jobs to the graph, mark jobs as done or failed and get the jobs with no more dependencies sorted by priority. The jobs are saved as id only. In addition you can view the graph as simple console-output.
Usage¶
create a graph: myGraph = Graph()
- add jobs: myGraph.addJob(1, [])
myGraph.addJob(2, [1], 2) …
get jobs without dependencies: myGraph.getReadyJobs()
- mark a job as done or Failed: myGraph.jobDone(1, DONE)
myGraph.jobDone(2, ERROR)
print the graph: myGraph.printGraph()
Required classes and methods:
from job_management.graph.node import Node, DONE, ERROR, WAITING, reprStatus from copy import deepcopy
- class
Graph
(nodeId=- 1)[source]¶Serves as root for a graph, describing the dependencies in a joblist. Has got every node in the graph as child to have a easy access to every node.
Inherited from Node:
addParent(self, parent) markParentProcessed(self, parent) addChild(self, newChild) calculatePriority(self)
addGraph
(graph, parentIds)[source]¶Adds a new graph, and therefore a new node to the graph.
- Parameters
graph (Graph) – the graph to add
parentsId (list of ints) – a list of the parents ids
buildGraph
()[source]¶Adds the associated childs to every node.
- Returns
the current Graph-instance
- Return type
Graph()
getReadyJobs
()[source]¶Returns a list of the ids of the jobs with no further dependencies, ready to start. They are sorted by weight from big to small.
- Returns
list of the ids of the jobs ready to start
- Return type
list of int
jobDone
(jobId, status)[source]¶Marks a job in the graph as DONE or ERROR. If the status is ERROR all childs an grandchilds will get the status DEAD_PARENT. This happens in the setStatus of the nodes.
- Parameters
jobId (int) – id of the job with the status to change
status (int-constant) – new status
Node¶
The class Node serves as nodes in the dependency-graph. A node owns a id, a list of its parents, a list of its childs, its weight, its status and a flag if the weight was already calculated. It offers a method to add a child to the list of childs and a method to delete a parent from the list of parents. Calculate weight calculates recursive the weight of the node. Furthermore five int-constants for the statuses and a dictionary to print a string instead of the value of the constant are offered
Attention
Node is not intended to be used without graph!
- class
Node
(nodeId, parents, priority=1)[source]¶Node is a class which serves as node in a graph of dependencies between jobs. As instancevariables it saves its own id, its parents which are not done, its childs, its weight and calculated weight and its status.
- Parameters
addChild
(newChild)[source]¶Adds a child to the list of childs
- Parameters
newChild (Node) – child to add
addParent
(newParent)[source]¶Appends a new parent to the list of parents
- Parameters
newParent (Node) – parent to append
calculatePriority
()[source]¶Calculates the weight of the node and all of its children
- Returns
the own calculated weight
- Return type
- property
calculatedPriority
¶Weight of critical path
IdGenerator¶
IdGenerator is a class build as “Borgpattern”. Every instance will share the state. Therefore getId generates global unique ids in every Thread. The first id is one. By calling reset the ids will start at one again.
Resources¶
Resources manages free nodes/processors on the cluster or a workstation. If you initialize a instance of Resources you have to pass the number of nodes you want to use. You can either ask for the number of free nodes, allocate nodes or free nodes if your calculation is done.
The __call__, __enter__ and __exit__-methods allow to use a instance of Resources with the with-statement.
- class
Resources
(resourcesDict=None)[source]¶Class to manage the resources available for e.g. a joblist
- Parameters
nRes (int) – number of available nodes
allocateResources
(resourcesDict)[source]¶Allocates the resources asked for.
- Parameters
resourcesDict (dictionary) – dictionary with the resources to allocate
- classmethod
fromParameters
(nProcs=None, nNodes=None, nTecplotLicenses=None)[source]¶Generates a Resource instance from parameters.
releaseResources
(resourcesDict)[source]¶Allows to free resources allocated earlier
- Parameters
resourcesDict (dictionary) – dictionary containing the resources to free
resourcesAvailable
(resourcesDict)[source]¶Returns true if in the resource management has the resources in resourcesDict available.
- Parameters
resourcesDict (dictionary) – a dictionary mapping resources to the needed number
- Returns
True if the resources are available in this environment
- Return type
boolean
- property
resourcesDict
¶Returns a copy of the resources dictionary hold by this manager.
- Returns
copy of the resources dictionary hold by this manager
- Return type
dictionary
CpuInfo¶
CpuInfo supplies a class to hold the information of the cpu(s) of a workstation. It owns methods to read the information from proc/cpuinfo and calculate more information from the given.
Required classes and methods:
import re
- class
CPUInfo
(numLogicalCPUs: int, numCoresPerSocket: int, numSiblingsPerSocket: int)[source]¶wraps properties of /proc/cpuinfo. Such as number of physical/logical CPU cores and hyper-threading status. After initialization those properties are accessible as object attributes. This class works only for _symmetric_ multiprocessor systems running linux.
- Instance variables:
numLogicalCPUs - the number of cores that linux takes for the number of cpus numCoresPerSocket - how many physical cores are on each socket numSiblingsPerSocket - how many (total) cores are on each socket numSiblingsPerCore - how many (total) cores are on each core hyperThreadingEnabled - determines whether hyperThreading is activated on the system numPhysicalCores - how many real (physical) cores are available on the system numCPUSockets - how many sockets (slots) are the cpus plugged into
ClusterInfo¶
ClusterInfo supplies a class to hold information about a cluster, and the nodes one wants to use. The class method ‘ClusterInfo.from_environment’ returns an instance corresponding to the current environment by reading ssh fingerprints.
- class
ClusterInfo
(name: str, ssh_fingerprints: Set[str], default_partition_name: str, partitions: Dict[str, mojo.jobManagement.management.clusterInfo.Partition], queuing_system: str = 'SLURM', account_env_var_name: str = 'AT_NUM_DEFAULT_ACCOUNT', extra_startup_minutes: int = 10, nodes_per_switch: int = 40, mail_relay: str = 'smtprelay.dlr.de')[source]¶Container for information about a known cluster environment.
For a new cluster add a new instance to ‘_supportedClusters’ down below.
- property
cpuInfo
¶Get information about the clusters default partition.
If you need information about another partition you can use ‘instance.partitions[partition_name].cpu_info’.
- classmethod
from_environment
(default_partition: Optional[str] = None) → Optional[mojo.jobManagement.management.clusterInfo.ClusterInfo][source]¶Get a new instance corresponding to the current environment.
- Parameters
default_partition – Select a different default partition (node type)