Source code for

Job supplies a class to compute a trace-, prep- or post job, on a workstation or a cluster.
When creating an instance in dependency on parameters passed a suitable class will be chosen and initiated.

    Job scheduling on cluster is managed by PBS (Portable Batch System) or SLURM.

for use on a cluster:
    Information about the cluster has to be added in ''

import itertools
import os
import re
import sys

from import shellJob, methodJob
from import qSubJob, slurmJob
from import ClusterInfo
from import CPUInfo

def _isTRACEsTppArgumentSet(args):
    tppSet = False
    joinedArgsList = list(itertools.chain.from_iterable(item.split() for item in args))
    if len(joinedArgsList) >= 2 and not joinedArgsList[0].startswith('-') and re.match(r'^\d+$', joinedArgsList[1]) is not None:
        tppSet = True
    elif "-tpp" in joinedArgsList:
        tppSet = True
    return tppSet

[docs]class Job: """A class to create the job you need by analyzing the given parameters :param executable: path to the executable or a method :param nProcs: number of processors to use :param args: arguments which will be appended when executing :param kwargs: a variable to pass keyword arguments to a methodJob :param nTecplotLicenses: number of tecplot licenses needed by the job :param queue: queue to put the job in :param workingDirectory: directory to change to before executing :param outputFile: path to a file for the output :param jobName: name for the job :param procsPerNode: process per node :param threadsPerProc: threads to start per processor :param envVars: environment variables to set :param lock: DEPRECATED; exists for compatibility reasons :param runInBackground: if False the job, when started as single job, will stay in the foreground :param nodeType: type of the node to calculate on :param executeOnMaster: if True the job will be executed on the master if on a cluster :param jobType: possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay :param outputDir: directory the output file will be saved in :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: if False the job will not be executed with MPI but the number of processes is used :param wallTime: maximum wall time for a job on the cluster :param account: account on which computation costs are charged :param retries: number of times the job is retried when failing :param mailAddress: mail address for status information (only used for SLURM jobs!) :param slurmSwitches: slurm option requesting minimal number of network switches :type executable: str or method :type nProcs: int :type args: list :type kwargs: dict :type nTecplotLicenses: int :type queue: str :type workingDirectory: str :type outputFile: str :type jobName: str :type procsPerNode: int :type threadsPerProc: int :type envVars: dict :type lock: Lock instance :type runInBackground: bool :type nodeType: str :type executeOnMaster: bool :type jobType: str :type outputDir: str :type successStatements: list :type failStatements: list :type ignoreExitCode: bool :type resourcesDict: dict :type useMpirun: bool :type wallTime: str :type account: str :type retries: int :type mailAddress: str :type slurmSwitches: str :return: A job; MethodJob, ShellJob, QSubJobInstance or slurmJobInstance in dependency on the given parameters :rtype: MethodJob/ShellJob/QsubJob/SlurmJob instance """ @classmethod def __getNameOfExecutable(cls, path): """Return the name from the path to the executable :param path: path to the executable or a method :type path: string or method :return: name of the executable :rtype: string """ return os.path.split(str(path))[1] @classmethod def __determineProcsAndThreads(cls, jobType, nameOfExecutable, executeOnMaster, cpuInfo, nProcs, procsPerNode, threadsPerProc): """Apply best practice guidelines """ if jobType == GMC_PLAY_EXEC or nameOfExecutable == GMC_PLAY_EXEC or nameOfExecutable == PYMESH_EXEC: threadsPerProc = 1 nProcs = 1 elif jobType == TRACE_SUITE_TRACE_EXEC: if threadsPerProc is None or threadsPerProc < 1: if ClusterInfo.onValidCluster() and not executeOnMaster: threadsPerProc = cpuInfo.numSiblingsPerCore if procsPerNode is None else cpuInfo.numLogicalCPUs // procsPerNode else: threadsPerProc = 1 else: if threadsPerProc is not None and threadsPerProc > 1: print("WARNING: threadsPerProc greater than 1 and the job type is not Trace! threadsPerProc will be set to 1.") threadsPerProc = 1 procsPerNode = procsPerNode or min(abs(nProcs), cpuInfo.numLogicalCPUs // threadsPerProc) return nProcs, procsPerNode, threadsPerProc def __new__(cls, executable, nProcs=1, args=None, kwargs=None, nTecplotLicenses=0, queue=None, workingDirectory=None, outputFile=None, jobName=None, procsPerNode=None, threadsPerProc=None, envVars=None, runInBackground=False, nodeType="ib", executeOnMaster=False, jobType=None, outputDir=None, successStatements=None, failStatements=None, ignoreExitCode=False, resourcesDict=None, useMpirun=True, wallTime=None, weight=0, dependJobID=None, account=None, retries=0, group=None, mailAddress=None, slurmSwitches=None): """ Creates a new instance of a job in dependency of the given parameters. If runInBackground is False the job gets a new write status method to write the status to stdout. jobType chooses some standard options for different job types. :param executable: path to the executable or a method :param nProcs: number of processors to use :param args: arguments which will be appended when executing :param kwargs: a variable to pass keyword arguments to a methodJob :param nTecplotLicenses: number of tecplot licenses needed by the job :param queue: queue to put the job in :param workingDirectory: directory to change to before executing :param outputFile: path to a file for the output :param jobName: name for the job :param procsPerNode: process per node :param threadsPerProc: threads to start per processor :param envVars: environment variables to set :param runInBackground: if False the job will not be executed with MPI :param nodeType: type of the node to calculate on :param executeOnMaster: if True the job will be executed on the master if on a cluster :param jobType: possibility to declare the jobType to control the result; available: TRACE, PREP, POST, gmcPlay :param outputDir: directory the output file will be saved in :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: if False the job will not be executed with MPI but the number of processes is used :param wallTime: maximum wall time for a job on the cluster :param weight: the weight (seconds to run times number of processes) of the job :param account: account on which computation costs are charged :param retries: number of times the job is retried when failing :param group: name of the group that contains this job, if any :param mailAddress: mail address for status information :param slurmSwitches slurm option requesting minimal number of network switches :type executable: str or method :type nProcs: int :type args: list :type kwargs: dict :type nTecplotLicenses: int :type queue: str :type workingDirectory: str :type outputFile: str :type jobName: str :type procsPerNode: int :type threadsPerProc: int :type envVars: dict :type runInBackground: bool :type nodeType: str :type executeOnMaster: bool :type jobType: str :type outputDir: str :type successStatements: list :type failStatements: list :type ignoreExitCode: bool :type resourcesDict: dict :type useMpirun: bool :type wallTime: str :type weight: int :type account: str :type retries: int :type group: str :type mailAddress: str :type slurmSwitches str :return: A job; MethodJob, ShellJob, SlurmJob, or QSubJobinstance in dependency on the given parameters :rtype: MethodJob/ShellJob/QsubJob instance """ # ignore lock, no longer needed successStatements = successStatements or list() failStatements = failStatements or list() nameOfExecutable = cls.__getNameOfExecutable(executable) if not successStatements: if jobType in SUCCESS_STATUS.keys(): successStatements.append(re.escape(SUCCESS_STATUS[jobType])) elif nameOfExecutable in SUCCESS_STATUS.keys(): successStatements.append(re.escape(SUCCESS_STATUS[nameOfExecutable])) clusterInfo = None if ClusterInfo.onValidCluster(): clusterInfo = ClusterInfo.from_environment(queue) cpuInfo = clusterInfo.cpuInfo else: cpuInfo = CPUInfo.from_proc_cpuinfo() groupable = procsPerNode is None nProcs, procsPerNode, threadsPerProc = \ cls.__determineProcsAndThreads(jobType, nameOfExecutable, executeOnMaster, cpuInfo, nProcs, procsPerNode, threadsPerProc) # multiple threads are currently used for TRACE only, the command line option is set here if jobType == TRACE_SUITE_TRACE_EXEC and threadsPerProc > 1 and not _isTRACEsTppArgumentSet(args): args.append(f"-tpp {threadsPerProc}") useMpirun = useMpirun and nProcs > 1 try: if executable.endswith(".py") or executable == PYTHON_EXECUTABLE or\ executable == "/usr/bin/env" and args[0].endswith(".py") or\ executable == "/usr/bin/env" and args[0] == PYTHON_EXECUTABLE: failStatements.append(re.escape(PYTHON_FAIL_STATEMENT)) except AttributeError: pass if not isinstance(executable, str): # means the executable is a python method failStatements = failStatements or [re.escape(PYTHON_FAIL_STATEMENT)] job = methodJob.MethodJob(method=executable, args=args, kwargs=kwargs, nTecplotLicenses=nTecplotLicenses, name=jobName, workingDir=workingDirectory, outFile=outputFile, envVars=envVars, successStatements=successStatements, failStatements=failStatements, outputDir=outputDir, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, weight=weight, retries=retries) elif ClusterInfo.onValidCluster() and not executeOnMaster: account = account or os.environ.get(clusterInfo.account_env_var_name) if clusterInfo.queuing_system == "PBS": job = qSubJob.QSubJob(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=jobName, outFile=outputFile, workingDir=workingDirectory, envVars=envVars, threadsPerProc=threadsPerProc, outputDir=outputDir, procsPerNode=procsPerNode, queue=queue, nodeType=nodeType, successStatements=successStatements, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, wallTime=wallTime, weight=weight, dependJobID=dependJobID, retries=retries) elif clusterInfo.queuing_system == "SLURM": job = slurmJob.SlurmJob(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=jobName, outFile=outputFile, workingDir=workingDirectory, envVars=envVars, threadsPerProc=threadsPerProc, outputDir=outputDir, procsPerNode=procsPerNode, clusterInfo=clusterInfo, successStatements=successStatements, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, wallTime=wallTime, weight=weight, dependJobID=dependJobID, account=account, retries=retries, mailAddress=mailAddress, groupable=groupable, switches=slurmSwitches) else: raise RuntimeError(f"queuing system not supported: {clusterInfo.queuing_system}") else: job = shellJob.ShellJob(executable=executable, args=args, nTecplotLicenses=nTecplotLicenses, nProcs=nProcs, name=jobName, outFile=outputFile, workingDir=workingDirectory, envVars=envVars, successStatements=successStatements, outputDir=outputDir, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, weight=weight, retries=retries, threadsPerProc=threadsPerProc) job.setGroup(group) if mailAddress is not None and not isinstance(job, slurmJob.SlurmJob): print("WARNING: ignored given mailing address, only supported for slurm jobs.") return job
def printJobInformation(theJob): """Prints the information for an instantiated, but not yet running job :param theJob: the job you want to print the information for :type theJob: instance of a job """ (execPath, execName) = os.path.split(theJob.executable) output = list() output.append('+--------------------------------------------------------------') if isinstance(theJob, qSubJob.QSubJob): output.append('| Sending to queueing system:') output.append('| Script : %s' % theJob.qsubScript) output.append('| Exec name : %s' % execName) elif isinstance(theJob, slurmJob.SlurmJob): output.append('| Sending to queueing system:') output.append('| Script : %s' % theJob.sbatchScript) output.append('| Exec name : %s' % execName) if theJob.account is not None: output.append(f'| Account : {theJob.account}') else: output.append('| Starting : %s' % execName) output.append('| Exec path : %s' % execPath) output.append('| Arguments : %s' % " ".join(theJob.args)) if theJob.workingDir: output.append('| Workdir : %s' % theJob.workingDir) if isinstance(theJob, qSubJob.QSubJob) or isinstance(theJob, slurmJob.SlurmJob): output.append('| Job : %s' % output.append('| Queue/Partition : %s' % theJob.queue) output.append('| Nodes : %s' % theJob.nNodes) output.append('| ProcessesPerNode : %d' % theJob.nProcsPerNode) if getattr(theJob, 'switchesOption', None) is not None: output.append('| Switches option : %s' % theJob.switchesOption) if theJob.maxWallTime: output.append('| Wall time limit : %s' % theJob.maxWallTime) if theJob.dependJobID: output.append('| Depends on jobID : %s' % theJob.dependJobID) open('waitsForJobID_%s' % theJob.dependJobID, 'w').close() if isinstance(theJob, shellJob.ShellJob): output.append('| ThreadsPerProc : %d' % theJob.nThreadsPerProc) output.append('| Total threads : %d' % theJob.nThreads) if getattr(theJob, 'mailAddress', None) is not None: output.append('| Mail Address : %s' % theJob.mailAddress) output.append('| Output file : %s' % theJob.outfile) output.append('+--------------------------------------------------------------') print("\n".join(output)) if theJob.outfile and theJob.outfile != sys.stdout: with open(theJob.outfile, "a") as outfile: outfile.write("\n".join(output) + "\n") def printJobId(theJob): """Prints the JobId for a already started qSubJob. """ if isinstance(theJob, qSubJob.QSubJob) or isinstance(theJob, slurmJob.SlurmJob): if theJob.jobId or True: print('| Job Id : %s' % theJob.jobId) print('+--------------------------------------------------------------')