Source code for mojo.jobManagement.jobs.TracePrepPost.qSubJob

"""
QSubJob
=======
QSubJob supplies a class which inherits from ShellJob and Clusterinfo. The functionality is nearly
the same as in ShellJob but the job will be executed on nodes and not on the workstation or the
master.
"""


import os
import re
import subprocess
import time

from ... import jobManagementData as jobConsts
from ..shellJob import ShellJob
from ...management.clusterInfo import ClusterInfo
from ...management import qstat
from ...management.resources import NODES, PROCESSORS


def cleanQsubStandardFilesForTestcase(workingDir, tcName, removeScript):
    """Remove all files <job name>.(e|o)+?[0-9]

    :param workingDir:      working directory
    :type workingDir:       string
    :param tcName:          test case name
    :type tcName:           string
    :param removeScript:    flag whether file should be removed
    :type removeScript:     boolean
    :return:                number of deleted files
    :rtype:                 integer
    """
    nDelFiles = 0

    jobFiles = tcName + ".[eo][0-9]+?"
    for elt in [f for f in os.listdir(workingDir) if re.search(jobFiles, f)]:
        try:
            deletedFilePath = os.path.join(workingDir, elt)
            # do not remove the file if the job was not terminated normally and it is not empty
            if removeScript:
                os.remove(deletedFilePath)
                nDelFiles += 1
        except OSError:
            pass

    return nDelFiles


[docs]class QSubJob(ShellJob): """Class to start a job with pThreads and mpi on nodes on the cluster. :param executable: path to the executable :param nProcs: number of processors to use :param args: arguments which will be appended when executing :param nTecplotLicenses: number of tecplot licenses needed by the job :param name: name for the job :param outFile: path to a file for the output :param workingDir: directory to change to before executing :param envVars: environment variables to set :param threadsPerProc: threads to start per processor :param procsPerNode: process per node :param queue: queue to put the job in :param nodeType: type of the node to calculate on :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param wallTime: maximum wall time for a job on the cluster :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :type executable: str :type nProcs: int :type args: list :tpye nTecplotLicenses: int :type name: str :type outFile: str :type workingDir: str :type envVars: dict :type threadsPerProc: int :type procsPerNode: int :type queue: str :type nodeType: str :type successStatements: list :type failStatements: list :type outputDir: str :type runInBackground: bool :type ignoreExitCode: bool :type wallTime: str :type resourcesDict: dict :type weight: int :type retries: int """ _defaultQueue = "batch" _runScriptExtension = '.run.sh' _jsonAttributes = ("name", "workingDir", "qsubScript", "executable", "args", "overwritingEnvVars", "nNodes", "nProcsPerNode", "nThreadsPerProc", "outputFile", "queue", "maxWallTime") def __init__(self, executable, nProcs=1, args=None, nTecplotLicenses=0, name=None, outFile=None, workingDir=None, envVars=None, threadsPerProc=None, procsPerNode=None, queue=None, nodeType="w", successStatements=None, outputDir=None, failStatements=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, useMpirun=False, wallTime=None, weight=jobConsts.DEFAULT_WEIGHT_SHELL, dependJobID=None, retries=0): """initializes a new instance calling the super constructors, calculates the needed nodes if not given and creates a name for the qsub script. :param executable: path to the executable :param nProcs: number of processors to use :param args: arguments which will be appended when executing :param nTecplotLicenses: number of tecplot licenses needed by the job :param name: name for the job :param outFile: path to a file for the output :param workingDir: directory to change to before executing :param envVars: environment variables to set :param threadsPerProc: threads to start per processor :param procsPerNode: process per node :param queue: queue to put the job in :param nodeType: type of the node to calculate on :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param wallTime: maximum wall time for a job on the cluster :param weight: the weight (seconds to run times number of processes) of the job :param dependJobID: ID of the job which has to finish before start :param retries: number of times the job is retried when failing :type executable: str :type nProcs: int :type args: list :tpye nTecplotLicenses: int :type name: str :type outFile: str :type workingDir: str :type envVars: dict :type threadsPerProc: int :type procsPerNode: int :type queue: str :type nodeType: str :type successStatements: list :type failStatements: list :type outputDir: str :type runInBackground: bool :type ignoreExitCode: bool :type wallTime: str :type resourcesDict: dict :type weight: int :type dependJobID: int :type retries: int """ self.clusterInfo = ClusterInfo.from_environment(nodeType) super(QSubJob, self).__init__(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=name, outFile=outFile, workingDir=workingDir, envVars=envVars, threadsPerProc=threadsPerProc, successStatements=successStatements, outputDir=outputDir, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, useMpirun=useMpirun, weight=weight, retries=retries) # Output is written to file in any case. Therefore the output file must not be deleted during cleanup. self._writeToFile = True self.dependJobID = dependJobID self.__jobId = None self.__queue = queue or self._defaultQueue self.__qsubScript = f'{self.name}{self._runScriptExtension}' self.__qsubScript = os.path.abspath(os.path.join(self._workingDir, self.__qsubScript)) self.__qstat = None self.__qstatErrorCounter = 0 if procsPerNode and threadsPerProc: self.__procsPerNode = procsPerNode self._threadsPerProc = threadsPerProc elif procsPerNode and not threadsPerProc: assert procsPerNode <= self.clusterInfo.cpuInfo.numLogicalCPUs, \ "Unable to calculate number of threads, there are more processes than cpus on the nodes" self.__procsPerNode = procsPerNode self._threadsPerProc = self.clusterInfo.cpuInfo.numLogicalCPUs // procsPerNode elif not procsPerNode and not threadsPerProc: self.__procsPerNode = self.clusterInfo.cpuInfo.numPhysicalCores self._threadsPerProc = self.clusterInfo.cpuInfo.numSiblingsPerCore elif threadsPerProc == 1 and not procsPerNode: self.__procsPerNode = self.clusterInfo.cpuInfo.numPhysicalCores self._threadsPerProc = 1 else: self.__procsPerNode = self.clusterInfo.cpuInfo.numLogicalCPUs // threadsPerProc self._threadsPerProc = threadsPerProc nNodes, carry = divmod(nProcs, self.__procsPerNode) self.__nNodes = nNodes + 1 if carry else nNodes self._resources.update({NODES: self.__nNodes, PROCESSORS: 0}) # set processors to zero, because the job does not need any processors on the master self._modulesConfigured = (jobConsts.MODULES_HOME in os.environ) or (jobConsts.MODULE_PATH in os.environ) self._loadedModules = list() if self._modulesConfigured: self._loadedModules = os.environ[jobConsts.LOADED_MODULES].split(":") self.__MaxWallTime = None self.maxWallTime = wallTime @property def jobId(self): return self.__jobId @property def queue(self): return self.__queue @property def nProcsPerNode(self): return self.__procsPerNode @property def nNodes(self): return self.__nNodes @property def qsubScript(self): return self.__qsubScript @property def maxWallTime(self): return self.__MaxWallTime @maxWallTime.setter def maxWallTime(self, value): self.__MaxWallTime = value if value: timeList = value.split(':') if len(timeList) == 1: self.weight = int(timeList[0]) elif len(timeList) == 2: self.weight = int(timeList[0]) * 60 + int(timeList[1]) elif len(timeList) == 3: self.weight = int(timeList[0]) * 3600 + int(timeList[1]) * 60 + int(timeList[2]) elif len(timeList) == 4: self.weight = int(timeList[0]) * 86400 + int(timeList[1]) * 3600 + int(timeList[2]) * 60 + int(timeList[3]) else: raise ValueError("Unknown time format") def _getCommandLine(self): """Builds up the command line which enqueues the job """ commandLine = ["qsub", "-V", "-l", "nodes={}:ppn={}".format(self.__nNodes, self.__procsPerNode), "-q", self.__queue, "-N", self._name, ] if self.maxWallTime: commandLine.append("-l") commandLine.append("walltime={}".format(self.maxWallTime)) if self.dependJobID is not None: commandLine.append("-W") commandLine.append("depend=afterany:{}".format(self.dependJobID)) commandLine.append(self.__qsubScript) return commandLine
[docs] def runJob(self): """Starts the job using qsub and pThreads when needed. """ self._startTime = None # Reset start time as we only want to measure the actually elapsed time self.prepareOutFile() if self._threadsPerProc > 1: self._args.insert(0, f"-tpp {self._threadsPerProc}") envVars = os.environ.copy() envVars.update(self._envVars) self.__createBatchFile() commandline = self._getCommandLine() try: qsubStdout = subprocess.Popen(commandline, cwd=self._workingDir, env=envVars, stdout=subprocess.PIPE, encoding="utf8").communicate()[0] try: self.__jobId = qsubStdout.split()[0] self.__qstat = qstat.Qstat(self.__jobId) except IndexError: self._writeStatus(jobConsts.ERROR, "cannot read jobId. Trying to submit to the wrong queue?") return if not self._inJoblist: if self._runInBackground: print(f"The id of the just started job is '{self.__jobId}'.") else: self.waitUntilFinished(pollingInterval=jobConsts.POLLING_INTERVAL_CLUSTER, maxPollingInterval=jobConsts.MAX_POLLING_INTERVAL_CLUSTER, scalingPollingFactor=jobConsts.SCALING_POLLING_INTERVAL_CLUSTER) except OSError as e: self._writeStatus(jobConsts.ERROR, f"cannot call qsub: {e}")
[docs] def stop(self): """Kills the current job via process qdel. """ commandline = f"qdel {self.__jobId}" try: subprocess.call(commandline.split()) except OSError: self._writeStatus(jobConsts.ERROR, f"Unable to kill job {self.__jobId}")
def __createBatchFile(self): """Generates the <JOBNAME>.run.sh file, the pbs-script which will be passed to qsub """ lines = [] # shebang bash lines.append("#!/bin/bash \n") # change to the same directory on each execution host lines.append("cd $PBS_O_WORKDIR \n") # making the "module" command available lines.append(". /etc/profile\n") # write additional environment variables for key, value in self._envVars.items(): lines.append(f"export {key}={value}\n") # load the same modules which were loaded on the starting shell if self._modulesConfigured: lines.append("module purge\n") for module in self._loadedModules: lines.append(f"module load {module}\n") # switch to an alternative working directory if self._workingDir: lines.append(f"cd {self._workingDir}\n") lines.append(" ".join(self._generateExecutionCommand()) + f" >> {self.outFile} 2>&1\n") # write the script to a file try: with open(self.__qsubScript, "w") as handle: handle.writelines(lines) except IOError: raise RuntimeError(f"ERROR: Cannot create batch script {self.__qsubScript}.")
[docs] def updateStatus(self): """ """ errorMax = 20 jobStatus = self.__qstat.status if jobStatus in (qstat.Status.WAITING, qstat.Status.RUNNING): # Reset errorCount after one successful poll self.__qstatErrorCounter = 0 if self._startTime is None and jobStatus == qstat.Status.RUNNING: self._startTime = time.time() return False elif jobStatus == qstat.Status.DONE: self.lookForSuccess() return True else: self.__qstatErrorCounter += 1 if self.__qstatErrorCounter == errorMax: self._writeStatus(jobConsts.ERROR, "Error getting qstat status") return False
[docs] def cleanup(self, force=False, **kwargs): """Remove created scripts, output and error files :param force: flag whether cleanup is enforced :type force: bool :param removeScript: flag whether files should be removed :type removeScript: bool """ # remove output file and thread script super(QSubJob, self).cleanup(force, **kwargs) # remove qsub output files self.__cleanupScript(**kwargs)
def __cleanupScript(self, **kwargs): """Removes all created scripts and qSub output and error files. :param removeScript: flag whether files should be removed :type removeScript: bool """ removeScript = kwargs.get("removeScript", False) # remove qsub script if removeScript and self.status.code == jobConsts.DONE: try: os.remove(self.__qsubScript) except OSError: pass # remove all files <job name>.(e|o)+?[0-9] cleanQsubStandardFilesForTestcase(self._workingDir, self._name, removeScript)