Source code for mojo.jobManagement.jobs.job

"""
Job
===
Job supplies a class to compute a trace-, prep- or post job, on a workstation or a cluster.
When creating an instance in dependency on parameters passed a suitable class will be chosen and initiated.

Assumptions:
    Job scheduling on cluster is managed by PBS (Portable Batch System) or SLURM.

Requires:
---------
for use on a cluster:
    Information about the cluster has to be added in 'mojo.jobManagement.management.clusterInfo._supportedClusters'
"""

import itertools
import os
import re
import sys

from ...pavayo.computeData import SUCCESS_STATUS, PYTHON_FAIL_STATEMENT, GMC_PLAY_EXEC, PYMESH_EXEC, TRACE_SUITE_TRACE_EXEC, PYTHON_EXECUTABLE
from ..jobs import shellJob, methodJob
from ..jobs.TracePrepPost import qSubJob, slurmJob
from ..management.clusterInfo import ClusterInfo
from ..management.cpuInfo import CPUInfo


def _isTRACEsTppArgumentSet(args):
    tppSet = False
    joinedArgsList = list(itertools.chain.from_iterable(item.split() for item in args))
    if len(joinedArgsList) >= 2 and not joinedArgsList[0].startswith('-') and re.match(r'^\d+$', joinedArgsList[1]) is not None:
        tppSet = True
    elif "-tpp" in joinedArgsList:
        tppSet = True
    return tppSet


[docs]class Job: """A class to create the job you need by analyzing the given parameters :param executable: path to the executable or a method :param nProcs: number of processors to use :param args: arguments which will be appended when executing :param kwargs: a variable to pass keyword arguments to a methodJob :param nTecplotLicenses: number of tecplot licenses needed by the job :param queue: queue to put the job in :param workingDirectory: directory to change to before executing :param outputFile: path to a file for the output :param jobName: name for the job :param procsPerNode: process per node :param threadsPerProc: threads to start per processor :param envVars: environment variables to set :param lock: DEPRECATED; exists for compatibility reasons :param runInBackground: if False the job, when started as single job, will stay in the foreground :param nodeType: type of the node to calculate on :param executeOnMaster: if True the job will be executed on the master if on a cluster :param jobType: possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay :param outputDir: directory the output file will be saved in :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: if False the job will not be executed with MPI but the number of processes is used :param wallTime: maximum wall time for a job on the cluster :param account: account on which computation costs are charged :param retries: number of times the job is retried when failing :param mailAddress: mail address for status information (only used for SLURM jobs!) :param slurmSwitches: slurm option requesting minimal number of network switches :type executable: str or method :type nProcs: int :type args: list :type kwargs: dict :type nTecplotLicenses: int :type queue: str :type workingDirectory: str :type outputFile: str :type jobName: str :type procsPerNode: int :type threadsPerProc: int :type envVars: dict :type lock: Lock instance :type runInBackground: bool :type nodeType: str :type executeOnMaster: bool :type jobType: str :type outputDir: str :type successStatements: list :type failStatements: list :type ignoreExitCode: bool :type resourcesDict: dict :type useMpirun: bool :type wallTime: str :type account: str :type retries: int :type mailAddress: str :type slurmSwitches: str :return: A job; MethodJob, ShellJob, QSubJobInstance or slurmJobInstance in dependency on the given parameters :rtype: MethodJob/ShellJob/QsubJob/SlurmJob instance """ @classmethod def __getNameOfExecutable(cls, path): """Return the name from the path to the executable :param path: path to the executable or a method :type path: string or method :return: name of the executable :rtype: string """ return os.path.split(str(path))[1] @classmethod def __determineProcsAndThreads(cls, jobType, nameOfExecutable, executeOnMaster, cpuInfo, nProcs, procsPerNode, threadsPerProc): """Apply best practice guidelines """ if jobType == GMC_PLAY_EXEC or nameOfExecutable == GMC_PLAY_EXEC or nameOfExecutable == PYMESH_EXEC: threadsPerProc = 1 nProcs = 1 elif jobType == TRACE_SUITE_TRACE_EXEC: if threadsPerProc is None or threadsPerProc < 1: if ClusterInfo.onValidCluster() and not executeOnMaster: threadsPerProc = cpuInfo.numSiblingsPerCore if procsPerNode is None else cpuInfo.numLogicalCPUs // procsPerNode else: threadsPerProc = 1 else: if threadsPerProc is not None and threadsPerProc > 1: print("WARNING: threadsPerProc greater than 1 and the job type is not Trace! threadsPerProc will be set to 1.") threadsPerProc = 1 procsPerNode = procsPerNode or min(abs(nProcs), cpuInfo.numLogicalCPUs // threadsPerProc) return nProcs, procsPerNode, threadsPerProc def __new__(cls, executable, nProcs=1, args=None, kwargs=None, nTecplotLicenses=0, queue=None, workingDirectory=None, outputFile=None, jobName=None, procsPerNode=None, threadsPerProc=None, envVars=None, runInBackground=False, nodeType="ib", executeOnMaster=False, jobType=None, outputDir=None, successStatements=None, failStatements=None, ignoreExitCode=False, resourcesDict=None, useMpirun=True, wallTime=None, weight=0, dependJobID=None, account=None, retries=0, group=None, mailAddress=None, slurmSwitches=None): """ Creates a new instance of a job in dependency of the given parameters. If runInBackground is False the job gets a new write status method to write the status to stdout. jobType chooses some standard options for different job types. :param executable: path to the executable or a method :param nProcs: number of processors to use :param args: arguments which will be appended when executing :param kwargs: a variable to pass keyword arguments to a methodJob :param nTecplotLicenses: number of tecplot licenses needed by the job :param queue: queue to put the job in :param workingDirectory: directory to change to before executing :param outputFile: path to a file for the output :param jobName: name for the job :param procsPerNode: process per node :param threadsPerProc: threads to start per processor :param envVars: environment variables to set :param runInBackground: if False the job will not be executed with MPI :param nodeType: type of the node to calculate on :param executeOnMaster: if True the job will be executed on the master if on a cluster :param jobType: possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay :param outputDir: directory the output file will be saved in :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: if False the job will not be executed with MPI but the number of processes is used :param wallTime: maximum wall time for a job on the cluster :param weight: the weight (seconds to run times number of processes) of the job :param account: account on which computation costs are charged :param retries: number of times the job is retried when failing :param group: name of the group that contains this job, if any :param mailAddress: mail address for status information :param slurmSwitches slurm option requesting minimal number of network switches :type executable: str or method :type nProcs: int :type args: list :type kwargs: dict :type nTecplotLicenses: int :type queue: str :type workingDirectory: str :type outputFile: str :type jobName: str :type procsPerNode: int :type threadsPerProc: int :type envVars: dict :type runInBackground: bool :type nodeType: str :type executeOnMaster: bool :type jobType: str :type outputDir: str :type successStatements: list :type failStatements: list :type ignoreExitCode: bool :type resourcesDict: dict :type useMpirun: bool :type wallTime: str :type weight: int :type account: str :type retries: int :type group: str :type mailAddress: str :type slurmSwitches str :return: A job; MethodJob, ShellJob, SlurmJob, or QSubJobinstance in dependency on the given parameters :rtype: MethodJob/ShellJob/QsubJob instance """ # ignore lock, no longer needed successStatements = successStatements or list() failStatements = failStatements or list() nameOfExecutable = cls.__getNameOfExecutable(executable) if not successStatements: if jobType in SUCCESS_STATUS.keys(): successStatements.append(re.escape(SUCCESS_STATUS[jobType])) elif nameOfExecutable in SUCCESS_STATUS.keys(): successStatements.append(re.escape(SUCCESS_STATUS[nameOfExecutable])) clusterInfo = None if ClusterInfo.onValidCluster(): clusterInfo = ClusterInfo.from_environment(queue) cpuInfo = clusterInfo.cpuInfo else: cpuInfo = CPUInfo.from_proc_cpuinfo() groupable = procsPerNode is None nProcs, procsPerNode, threadsPerProc = \ cls.__determineProcsAndThreads(jobType, nameOfExecutable, executeOnMaster, cpuInfo, nProcs, procsPerNode, threadsPerProc) # multiple threads are currently used for TRACE only, the command line option is set here if jobType == TRACE_SUITE_TRACE_EXEC and threadsPerProc > 1 and not _isTRACEsTppArgumentSet(args): args.append(f"-tpp {threadsPerProc}") useMpirun = useMpirun and nProcs > 1 try: if executable.endswith(".py") or executable == PYTHON_EXECUTABLE or\ executable == "/usr/bin/env" and args[0].endswith(".py") or\ executable == "/usr/bin/env" and args[0] == PYTHON_EXECUTABLE: failStatements.append(re.escape(PYTHON_FAIL_STATEMENT)) except AttributeError: pass if not isinstance(executable, str): # means the executable is a python method failStatements = failStatements or [re.escape(PYTHON_FAIL_STATEMENT)] job = methodJob.MethodJob(method=executable, args=args, kwargs=kwargs, nTecplotLicenses=nTecplotLicenses, name=jobName, workingDir=workingDirectory, outFile=outputFile, envVars=envVars, successStatements=successStatements, failStatements=failStatements, outputDir=outputDir, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, weight=weight, retries=retries) elif ClusterInfo.onValidCluster() and not executeOnMaster: account = account or os.environ.get(clusterInfo.account_env_var_name) if clusterInfo.queuing_system == "PBS": job = qSubJob.QSubJob(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=jobName, outFile=outputFile, workingDir=workingDirectory, envVars=envVars, threadsPerProc=threadsPerProc, outputDir=outputDir, procsPerNode=procsPerNode, queue=queue, nodeType=nodeType, successStatements=successStatements, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, wallTime=wallTime, weight=weight, dependJobID=dependJobID, retries=retries) elif clusterInfo.queuing_system == "SLURM": job = slurmJob.SlurmJob(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=jobName, outFile=outputFile, workingDir=workingDirectory, envVars=envVars, threadsPerProc=threadsPerProc, outputDir=outputDir, procsPerNode=procsPerNode, clusterInfo=clusterInfo, successStatements=successStatements, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, wallTime=wallTime, weight=weight, dependJobID=dependJobID, account=account, retries=retries, mailAddress=mailAddress, groupable=groupable, switches=slurmSwitches) else: raise RuntimeError(f"queuing system not supported: {clusterInfo.queuing_system}") else: job = shellJob.ShellJob(executable=executable, args=args, nTecplotLicenses=nTecplotLicenses, nProcs=nProcs, name=jobName, outFile=outputFile, workingDir=workingDirectory, envVars=envVars, successStatements=successStatements, outputDir=outputDir, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, weight=weight, retries=retries, threadsPerProc=threadsPerProc) job.setGroup(group) if mailAddress is not None and not isinstance(job, slurmJob.SlurmJob): print("WARNING: ignored given mailing address, only supported for slurm jobs.") return job
def printJobInformation(theJob): """Prints the information for an instantiated, but not yet running job :param theJob: the job you want to print the information for :type theJob: instance of a job """ (execPath, execName) = os.path.split(theJob.executable) output = list() output.append('+--------------------------------------------------------------') if isinstance(theJob, qSubJob.QSubJob): output.append('| Sending to queueing system:') output.append('| Script : %s' % theJob.qsubScript) output.append('| Exec name : %s' % execName) elif isinstance(theJob, slurmJob.SlurmJob): output.append('| Sending to queueing system:') output.append('| Script : %s' % theJob.sbatchScript) output.append('| Exec name : %s' % execName) if theJob.account is not None: output.append(f'| Account : {theJob.account}') else: output.append('| Starting : %s' % execName) output.append('| Exec path : %s' % execPath) output.append('| Arguments : %s' % " ".join(theJob.args)) if theJob.workingDir: output.append('| Workdir : %s' % theJob.workingDir) if isinstance(theJob, qSubJob.QSubJob) or isinstance(theJob, slurmJob.SlurmJob): output.append('| Job : %s' % theJob.name) output.append('| Queue/Partition : %s' % theJob.queue) output.append('| Nodes : %s' % theJob.nNodes) output.append('| ProcessesPerNode : %d' % theJob.nProcsPerNode) if getattr(theJob, 'switchesOption', None) is not None: output.append('| Switches option : %s' % theJob.switchesOption) if theJob.maxWallTime: output.append('| Wall time limit : %s' % theJob.maxWallTime) if theJob.dependJobID: output.append('| Depends on jobID : %s' % theJob.dependJobID) open('waitsForJobID_%s' % theJob.dependJobID, 'w').close() if isinstance(theJob, shellJob.ShellJob): output.append('| ThreadsPerProc : %d' % theJob.nThreadsPerProc) output.append('| Total threads : %d' % theJob.nThreads) if getattr(theJob, 'mailAddress', None) is not None: output.append('| Mail Address : %s' % theJob.mailAddress) output.append('| Output file : %s' % theJob.outfile) output.append('+--------------------------------------------------------------') print("\n".join(output)) if theJob.outfile and theJob.outfile != sys.stdout: with open(theJob.outfile, "a") as outfile: outfile.write("\n".join(output) + "\n") def printJobId(theJob): """Prints the JobId for a already started qSubJob. """ if isinstance(theJob, qSubJob.QSubJob) or isinstance(theJob, slurmJob.SlurmJob): if theJob.jobId or True: print('| Job Id : %s' % theJob.jobId) print('+--------------------------------------------------------------')