"""
Job
===
Job supplies a class to compute a trace-, prep- or post job, on a workstation or a cluster.
When creating an instance in dependency on parameters passed a suitable class will be chosen and initiated.
Assumptions:
Job scheduling on cluster is managed by PBS (Portable Batch System) or SLURM.
Requires:
---------
for use on a cluster:
Information about the cluster has to be added in 'mojo.jobManagement.management.clusterInfo._supportedClusters'
"""
import itertools
import os
import re
import sys
from ...pavayo.computeData import SUCCESS_STATUS, PYTHON_FAIL_STATEMENT, GMC_PLAY_EXEC, PYMESH_EXEC, TRACE_SUITE_TRACE_EXEC, PYTHON_EXECUTABLE
from ..jobs import shellJob, methodJob
from ..jobs.TracePrepPost import qSubJob, slurmJob
from ..management.clusterInfo import ClusterInfo
from ..management.cpuInfo import CPUInfo
def _isTRACEsTppArgumentSet(args):
tppSet = False
joinedArgsList = list(itertools.chain.from_iterable(item.split() for item in args))
if len(joinedArgsList) >= 2 and not joinedArgsList[0].startswith('-') and re.match(r'^\d+$', joinedArgsList[1]) is not None:
tppSet = True
elif "-tpp" in joinedArgsList:
tppSet = True
return tppSet
[docs]class Job:
"""A class to create the job you need by analyzing the given parameters
:param executable: path to the executable or a method
:param nProcs: number of processors to use
:param args: arguments which will be appended when executing
:param kwargs: a variable to pass keyword arguments to a methodJob
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param queue: queue to put the job in
:param workingDirectory: directory to change to before executing
:param outputFile: path to a file for the output
:param jobName: name for the job
:param procsPerNode: process per node
:param threadsPerProc: threads to start per processor
:param envVars: environment variables to set
:param lock: DEPRECATED; exists for compatibility reasons
:param runInBackground: if False the job, when started as single job, will stay in the foreground
:param nodeType: type of the node to calculate on
:param executeOnMaster: if True the job will be executed on the master if on a cluster
:param jobType: possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay
:param outputDir: directory the output file will be saved in
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param useMpirun: if False the job will not be executed with MPI but the number of processes is used
:param wallTime: maximum wall time for a job on the cluster
:param account: account on which computation costs are charged
:param retries: number of times the job is retried when failing
:param mailAddress: mail address for status information (only used for SLURM jobs!)
:param slurmSwitches: slurm option requesting minimal number of network switches
:type executable: str or method
:type nProcs: int
:type args: list
:type kwargs: dict
:type nTecplotLicenses: int
:type queue: str
:type workingDirectory: str
:type outputFile: str
:type jobName: str
:type procsPerNode: int
:type threadsPerProc: int
:type envVars: dict
:type lock: Lock instance
:type runInBackground: bool
:type nodeType: str
:type executeOnMaster: bool
:type jobType: str
:type outputDir: str
:type successStatements: list
:type failStatements: list
:type ignoreExitCode: bool
:type resourcesDict: dict
:type useMpirun: bool
:type wallTime: str
:type account: str
:type retries: int
:type mailAddress: str
:type slurmSwitches: str
:return: A job; MethodJob, ShellJob, QSubJobInstance or slurmJobInstance in dependency on the given parameters
:rtype: MethodJob/ShellJob/QsubJob/SlurmJob instance
"""
@classmethod
def __getNameOfExecutable(cls, path):
"""Return the name from the path to the executable
:param path: path to the executable or a method
:type path: string or method
:return: name of the executable
:rtype: string
"""
return os.path.split(str(path))[1]
@classmethod
def __determineProcsAndThreads(cls, jobType, nameOfExecutable, executeOnMaster, cpuInfo, nProcs, procsPerNode, threadsPerProc):
"""Apply best practice guidelines
"""
if jobType == GMC_PLAY_EXEC or nameOfExecutable == GMC_PLAY_EXEC or nameOfExecutable == PYMESH_EXEC:
threadsPerProc = 1
nProcs = 1
elif jobType == TRACE_SUITE_TRACE_EXEC:
if threadsPerProc is None or threadsPerProc < 1:
if ClusterInfo.onValidCluster() and not executeOnMaster:
threadsPerProc = cpuInfo.numSiblingsPerCore if procsPerNode is None else cpuInfo.numLogicalCPUs // procsPerNode
else:
threadsPerProc = 1
else:
if threadsPerProc is not None and threadsPerProc > 1:
print("WARNING: threadsPerProc greater than 1 and the job type is not Trace! threadsPerProc will be set to 1.")
threadsPerProc = 1
procsPerNode = procsPerNode or min(abs(nProcs), cpuInfo.numLogicalCPUs // threadsPerProc)
return nProcs, procsPerNode, threadsPerProc
def __new__(cls, executable, nProcs=1, args=None, kwargs=None, nTecplotLicenses=0, queue=None, workingDirectory=None, outputFile=None,
jobName=None, procsPerNode=None, threadsPerProc=None, envVars=None, runInBackground=False, nodeType="ib",
executeOnMaster=False, jobType=None, outputDir=None, successStatements=None, failStatements=None, ignoreExitCode=False,
resourcesDict=None, useMpirun=True, wallTime=None, weight=0, dependJobID=None, account=None, retries=0, group=None,
mailAddress=None, slurmSwitches=None):
"""
Creates a new instance of a job in dependency of the given parameters. If runInBackground is False the job gets a new
write status method to write the status to stdout. jobType chooses some standard options for different job types.
:param executable: path to the executable or a method
:param nProcs: number of processors to use
:param args: arguments which will be appended when executing
:param kwargs: a variable to pass keyword arguments to a methodJob
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param queue: queue to put the job in
:param workingDirectory: directory to change to before executing
:param outputFile: path to a file for the output
:param jobName: name for the job
:param procsPerNode: process per node
:param threadsPerProc: threads to start per processor
:param envVars: environment variables to set
:param runInBackground: if False the job will not be executed with MPI
:param nodeType: type of the node to calculate on
:param executeOnMaster: if True the job will be executed on the master if on a cluster
:param jobType: possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay
:param outputDir: directory the output file will be saved in
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param useMpirun: if False the job will not be executed with MPI but the number of processes is used
:param wallTime: maximum wall time for a job on the cluster
:param weight: the weight (seconds to run times number of processes) of the job
:param account: account on which computation costs are charged
:param retries: number of times the job is retried when failing
:param group: name of the group that contains this job, if any
:param mailAddress: mail address for status information
:param slurmSwitches slurm option requesting minimal number of network switches
:type executable: str or method
:type nProcs: int
:type args: list
:type kwargs: dict
:type nTecplotLicenses: int
:type queue: str
:type workingDirectory: str
:type outputFile: str
:type jobName: str
:type procsPerNode: int
:type threadsPerProc: int
:type envVars: dict
:type runInBackground: bool
:type nodeType: str
:type executeOnMaster: bool
:type jobType: str
:type outputDir: str
:type successStatements: list
:type failStatements: list
:type ignoreExitCode: bool
:type resourcesDict: dict
:type useMpirun: bool
:type wallTime: str
:type weight: int
:type account: str
:type retries: int
:type group: str
:type mailAddress: str
:type slurmSwitches str
:return: A job; MethodJob, ShellJob, SlurmJob, or QSubJobinstance in dependency on the given parameters
:rtype: MethodJob/ShellJob/QsubJob instance
"""
# ignore lock, no longer needed
successStatements = successStatements or list()
failStatements = failStatements or list()
nameOfExecutable = cls.__getNameOfExecutable(executable)
if not successStatements:
if jobType in SUCCESS_STATUS.keys():
successStatements.append(re.escape(SUCCESS_STATUS[jobType]))
elif nameOfExecutable in SUCCESS_STATUS.keys():
successStatements.append(re.escape(SUCCESS_STATUS[nameOfExecutable]))
clusterInfo = None
if ClusterInfo.onValidCluster():
clusterInfo = ClusterInfo.from_environment(queue)
cpuInfo = clusterInfo.cpuInfo
else:
cpuInfo = CPUInfo.from_proc_cpuinfo()
groupable = procsPerNode is None
nProcs, procsPerNode, threadsPerProc = \
cls.__determineProcsAndThreads(jobType, nameOfExecutable, executeOnMaster, cpuInfo, nProcs, procsPerNode, threadsPerProc)
# multiple threads are currently used for TRACE only, the command line option is set here
if jobType == TRACE_SUITE_TRACE_EXEC and threadsPerProc > 1 and not _isTRACEsTppArgumentSet(args):
args.append(f"-tpp {threadsPerProc}")
useMpirun = useMpirun and nProcs > 1
try:
if executable.endswith(".py") or executable == PYTHON_EXECUTABLE or\
executable == "/usr/bin/env" and args[0].endswith(".py") or\
executable == "/usr/bin/env" and args[0] == PYTHON_EXECUTABLE:
failStatements.append(re.escape(PYTHON_FAIL_STATEMENT))
except AttributeError:
pass
if not isinstance(executable, str): # means the executable is a python method
failStatements = failStatements or [re.escape(PYTHON_FAIL_STATEMENT)]
job = methodJob.MethodJob(method=executable, args=args, kwargs=kwargs, nTecplotLicenses=nTecplotLicenses, name=jobName,
workingDir=workingDirectory, outFile=outputFile, envVars=envVars, successStatements=successStatements,
failStatements=failStatements, outputDir=outputDir, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode,
resourcesDict=resourcesDict, weight=weight, retries=retries)
elif ClusterInfo.onValidCluster() and not executeOnMaster:
account = account or os.environ.get(clusterInfo.account_env_var_name)
if clusterInfo.queuing_system == "PBS":
job = qSubJob.QSubJob(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=jobName,
outFile=outputFile, workingDir=workingDirectory, envVars=envVars, threadsPerProc=threadsPerProc,
outputDir=outputDir, procsPerNode=procsPerNode, queue=queue, nodeType=nodeType, successStatements=successStatements,
failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode,
resourcesDict=resourcesDict, useMpirun=useMpirun, wallTime=wallTime, weight=weight, dependJobID=dependJobID, retries=retries)
elif clusterInfo.queuing_system == "SLURM":
job = slurmJob.SlurmJob(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=jobName,
outFile=outputFile, workingDir=workingDirectory, envVars=envVars, threadsPerProc=threadsPerProc,
outputDir=outputDir, procsPerNode=procsPerNode, clusterInfo=clusterInfo, successStatements=successStatements,
failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode,
resourcesDict=resourcesDict, useMpirun=useMpirun, wallTime=wallTime, weight=weight, dependJobID=dependJobID,
account=account, retries=retries, mailAddress=mailAddress, groupable=groupable, switches=slurmSwitches)
else:
raise RuntimeError(f"queuing system not supported: {clusterInfo.queuing_system}")
else:
job = shellJob.ShellJob(executable=executable, args=args, nTecplotLicenses=nTecplotLicenses, nProcs=nProcs, name=jobName,
outFile=outputFile, workingDir=workingDirectory, envVars=envVars, successStatements=successStatements,
outputDir=outputDir, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode,
resourcesDict=resourcesDict, useMpirun=useMpirun, weight=weight, retries=retries, threadsPerProc=threadsPerProc)
job.setGroup(group)
if mailAddress is not None and not isinstance(job, slurmJob.SlurmJob):
print("WARNING: ignored given mailing address, only supported for slurm jobs.")
return job
def printJobInformation(theJob):
"""Prints the information for an instantiated, but not yet running job
:param theJob: the job you want to print the information for
:type theJob: instance of a job
"""
(execPath, execName) = os.path.split(theJob.executable)
output = list()
output.append('+--------------------------------------------------------------')
if isinstance(theJob, qSubJob.QSubJob):
output.append('| Sending to queueing system:')
output.append('| Script : %s' % theJob.qsubScript)
output.append('| Exec name : %s' % execName)
elif isinstance(theJob, slurmJob.SlurmJob):
output.append('| Sending to queueing system:')
output.append('| Script : %s' % theJob.sbatchScript)
output.append('| Exec name : %s' % execName)
if theJob.account is not None:
output.append(f'| Account : {theJob.account}')
else:
output.append('| Starting : %s' % execName)
output.append('| Exec path : %s' % execPath)
output.append('| Arguments : %s' % " ".join(theJob.args))
if theJob.workingDir:
output.append('| Workdir : %s' % theJob.workingDir)
if isinstance(theJob, qSubJob.QSubJob) or isinstance(theJob, slurmJob.SlurmJob):
output.append('| Job : %s' % theJob.name)
output.append('| Queue/Partition : %s' % theJob.queue)
output.append('| Nodes : %s' % theJob.nNodes)
output.append('| ProcessesPerNode : %d' % theJob.nProcsPerNode)
if getattr(theJob, 'switchesOption', None) is not None:
output.append('| Switches option : %s' % theJob.switchesOption)
if theJob.maxWallTime:
output.append('| Wall time limit : %s' % theJob.maxWallTime)
if theJob.dependJobID:
output.append('| Depends on jobID : %s' % theJob.dependJobID)
open('waitsForJobID_%s' % theJob.dependJobID, 'w').close()
if isinstance(theJob, shellJob.ShellJob):
output.append('| ThreadsPerProc : %d' % theJob.nThreadsPerProc)
output.append('| Total threads : %d' % theJob.nThreads)
if getattr(theJob, 'mailAddress', None) is not None:
output.append('| Mail Address : %s' % theJob.mailAddress)
output.append('| Output file : %s' % theJob.outfile)
output.append('+--------------------------------------------------------------')
print("\n".join(output))
if theJob.outfile and theJob.outfile != sys.stdout:
with open(theJob.outfile, "a") as outfile:
outfile.write("\n".join(output) + "\n")
def printJobId(theJob):
"""Prints the JobId for a already started qSubJob.
"""
if isinstance(theJob, qSubJob.QSubJob) or isinstance(theJob, slurmJob.SlurmJob):
if theJob.jobId or True:
print('| Job Id : %s' % theJob.jobId)
print('+--------------------------------------------------------------')