Source code for mojo.jobManagement.jobs.abstractJob

"""
AbstractJob
===========
AbstractJob offers a class to inherit from for every type of job the job management is intended to
work with. It has the abstract methods runJob() and stop() implemented to be overwritten in every
derived class. AbstractJob inherits from Process and therefore can be started in a own process.
Furthermore five int-constants for the statuses and a dictionary to print a string instead of the
value of the constant are offered.

Required classes and methods::

    from abc import ABCMeta, abstractmethod
    import collections
    import glob
    import os
    import re
    import sys
    import time

    from jobManagement.management.idGenerator import IdGenerator
"""

from abc import ABCMeta, abstractmethod
import collections
import glob
import os
import re
import sys
import time

from ...bricabrac.fileIO import Printer
from ...jobManagement import jobManagementData as jobConsts
from ..management.idGenerator import IdGenerator
from ..management.resources import PROCESSORS, NODES, TECPLOT_LICENSES, Resources

from ..jobObserver import JobNotifier, JobEvent

_STATUS_STRING = "{{0:.<{length}}}{{1}}".format(length=21 if sys.stdout.isatty() else 15)


class Status(collections.namedtuple("Status", ("code", "message"))):
    """Class which inherits a named tuple to set default values at initiation.
    """
    def __new__(cls, code=jobConsts.WAITING, message=str()):
        """Creates a new instance and sets code to WAITING and message to an empty string by default.

        :param code:       code of the current status
        :param message:    message of the current status
        :type code:        int constant
        :type message:     string
        """
        return super(Status, cls).__new__(cls, code, message)


[docs]class AbstractJob(object, metaclass=ABCMeta): """Class every job should inherit from. Offers several methods and properties to communicate between the processes. It gets a id from IdGenerator, a queue to send the statuses to the main-thread, a name and a path to a log file(to a default log file if no log file is given). :param name: name for the new job :param nTecplotLicenses: number of tecplot licenses needed by the job :param outFile: path to the file the output will be written in :param outputDir: directory the output will be written in :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param workingDir: directory the job will be processed in :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource(valid dictionary key), value: quantity of the resource) :param weight: the weight (seconds to run times number of processes) of the job :type name: string :tpye nTecplotLicenses: integer :type outFile: string :type outputDir: string :type successStatements: list :type failStatements: list :type workingDir: string :type runInBackground: boolean :type ignoreExitCode: boolean :type resourcesDict: dictionary :type weight: int """ __outfileExtension = '.out' __knownNames = dict() _jsonAttributes = ("name", "workingDir", "nProcs", "outputFile") def __init__(self, name=None, nTecplotLicenses=0, outFile=None, outputDir=None, successStatements=None, failStatements=None, workingDir=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, weight=0, retries=0, group=None): """Initializes the instance variables. :param name: name for the new job :param nTecplotLicenses: number of tecplot licenses needed by the job :param outFile: path to the file the output will be written in :param outputDir: directory the output will be written in :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param workingDir: directory the job will be processed in :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :param group: name of the group that contains this job, if any :type name: string :tpye nTecplotLicenses: integer :type outFile: string :type outputDir: string :type successStatements: list :type failStatements: list :type workingDir: string :type runInBackground: boolean :type ignoreExitCode: boolean :type resourcesDict: dictionary :type weight: int :type retries: int :type group: string """ self.__id = IdGenerator().getId() outputDir = outputDir or os.curdir self._name = self.__generateUniqueJobName(name or "anonymous") self._resources = {PROCESSORS: 0, NODES: 0, TECPLOT_LICENSES: nTecplotLicenses} self._resources.update(resourcesDict or dict()) self._nProcs = 0 self._inJoblist = False self._parentJoblist = None self.__successStatements = successStatements or list() self.__failStatements = failStatements or list() self._workingDir = os.path.abspath(workingDir) if workingDir else os.path.abspath(".") self._runInBackground = runInBackground self.__ignoreExitCode = ignoreExitCode self.weight = weight self._notifier = JobNotifier(self) self._group = group # Since there is always an output file use this variable to determine whether output should be written to file. self._writeToFile = (outFile is not None) self._outFile = outFile or self._name + self.__outfileExtension if outputDir != os.curdir: self._outFile = os.path.abspath(os.path.join(outputDir, self._outFile)) else: self._outFile = os.path.abspath(os.path.join(self._workingDir, self._outFile)) self.__status = Status() self.__statusQueue = list() self._startTime = None self._endTime = None self.__overwritePreviousLine = False self._retries = retries self._tryCount = 0 # indicates that, in general, job may not run as part of a group on a cluster node self.allowJobGrouping = False @property def name(self): return self._name @property def group(self): return self._group @property def notifier(self): return self._notifier @property def jobListName(self): if self._parentJoblist: return self._parentJoblist.name else: return "" @property def _overwritePreviousLine(self): return self.__overwritePreviousLine @_overwritePreviousLine.setter def _overwritePreviousLine(self, overWritePreviousLine): self.__overwritePreviousLine = overWritePreviousLine and sys.stdout.isatty() @property def nProcs(self): return self._nProcs def getOutFile(self): return os.path.abspath(self._outFile) def setOutFile(self, newPath): self._outFile = newPath def setGroup(self, group): self._group = group outFile = property(getOutFile, setOutFile) outputFile = outFile outfile = outFile # DEPRECATED @property def id(self): return self.__id def getStatus(self): return self.__status def setStatus(self, newStatus): self.__status = newStatus status = property(getStatus, setStatus) @property def workingDir(self): return self._workingDir @property def statusQueue(self): return self.__statusQueue @property def startTime(self): return self._startTime @property def runtime(self): if self._startTime is None or self._endTime is None: return 0 else: return self._endTime - self._startTime
[docs] @abstractmethod def runJob(self): """Abstract method runJob, to be overwritten in subclasses. Shoult initialize and start the current job. """
[docs] @abstractmethod def updateStatus(self): """Abstract method updateStatus, to be overwritten in subclasses. Updates the job status and returns true if the status has changed. :return: returns true if the status has changed :rtype: boolean """
[docs] @abstractmethod def stop(self): """Abstract method stop, to be overwritten in subclasses. Should stop the current job. """
def getWeightRemaining(self, velocity=1): weightRemaining = self.weight if self.startTime is not None: weightRemaining = max(0, self.weight - ((time.time() - self.startTime) * self.nProcs * velocity)) return weightRemaining
[docs] def nJobsOfStatus(self, statusList=None): """Returns if the jobs status is in the list of status. :param statusList: list of valid status or None :type statusList: list or None :return: nuber of jobs of this status (0 or 1) :rtype: int """ nJobs = 0 if statusList is None or self.status.code in statusList: nJobs = 1 return nJobs
[docs] def resourcesAvailable(self): """Tests if the resources for the job are available. :return: True if the resources for the job are available :rtype: boolean """ return Resources().resourcesAvailable(self._resources)
[docs] def allocateResources(self): """Allocates the resources needed for the job. """ Resources().allocateResources(self._resources)
[docs] def freeResources(self): """Frees the resources used by this job. """ Resources().releaseResources(self._resources)
[docs] def startJob(self): """Set the own status to (RUNNING, ) and calls runJob. """ if self.status.code == jobConsts.DONE: # allows to run the same job multiple times self._tryCount = 0 self._tryCount += 1 self._startTime = time.time() self._endTime = None self.notifier.notifyObservers(JobEvent.JOB_STARTED_EVENT) self._writeStatus(jobConsts.RUNNING, f"{self.name} started!") try: self.runJob() except Exception as outerException: # pylint: disable=broad-except try: lines = list() lines.append("The job failed with the following parameters:\n" + "=" * 45 + "\n") lines.append(f"Name: {self.name}\n") if hasattr(self, "_executable"): lines.append(f"Executable: {self._executable}\n") else: lines.append(f"Method: {self._method}\n") lines.append(f"Working directory: {self._workingDir}\n") lines.append(f"Error message: {outerException!s}\n") with open(self._outFile, "a") as fileHandle: fileHandle.writelines(lines) except (IOError, AttributeError) as e: self._writeStatus(jobConsts.ERROR, f"Failed to write output file '{self._outFile}': '{e!s}") finally: self._writeStatus(jobConsts.ERROR, f"Job {self.name} failed with message: '{outerException!s}'. Output file: {self.outFile}")
def _writeStatus(self, status, message=str(), overwritePreviousLine=False): """Set and print status. Either prints the status to the console or puts it to the status queue. """ printedSomething = False if status in [jobConsts.DONE, jobConsts.ERROR, jobConsts.RETRY] and self._inJoblist: self._endTime = time.time() # Notify Observers if status == jobConsts.ERROR: self.notifier.notifyObservers(JobEvent.JOB_FAILED_EVENT) elif status == jobConsts.DONE: self.notifier.notifyObservers(JobEvent.JOB_SUCCEEDED_EVENT) if status == jobConsts.ERROR: if self._tryCount <= self._retries: status = jobConsts.RETRY message = f"{message}; Retrying (try {self._tryCount + 1}/{self._retries + 1})" elif self._retries > 0: message = f"{message}; Maximum number of tries reached ({self._retries + 1 })." self.status = Status(status, message) if status in [jobConsts.DONE, jobConsts.ERROR, jobConsts.RETRY]: self._endTime = time.time() if not self._inJoblist: prefix = jobConsts.DELETE_PREVIOUS_LINE if overwritePreviousLine else "" printedSomething = Printer.verbosePrint(prefix + _STATUS_STRING.format(jobConsts.REPR_STATUS[status], message), printLevel=Printer.Verbosity.JENKINS) else: self.__statusQueue.append(self.status) self._overwritePreviousLine = (status == jobConsts.PROGRESS) and printedSomething return printedSomething def __generateUniqueJobName(self, name): """Method to generate a unique job name. Needed, when jobs without name but the same executable are in one job list to generate unique output files. :param name: name of the job :type name: string :return: a unique jobname; jobname + number :rtype: string """ if name not in self.__knownNames: self.__knownNames[name] = 1 else: self.__knownNames[name] += 1 while True: fileName = f"{name}.{self.__knownNames[name]:03d}" if not glob.glob(f"{fileName}*"): break self.__knownNames[name] += 1 return fileName
[docs] def getJobDictForJSON(self, jobsListName): """Generates a dictionary with basic parameters to launch the job. :param jobsListName: name of job list :type jobsListName: str :return: launch parameters :rtype: dict """ params = collections.OrderedDict() params["jobList"] = jobsListName for attr in self._jsonAttributes: if attr == "args": params[attr] = " ".join(getattr(self, attr)) else: params[attr] = getattr(self, attr) return params
[docs] def prepareOutFile(self): """Generates the path to the output file and the output file itself. """ if os.path.exists(self.outFile): os.remove(self.outFile) elif not os.path.exists(os.path.dirname(self.outFile)) and os.path.dirname(self.outFile): os.makedirs(os.path.dirname(self.outFile)) # To create the output file and do not overwrite it with the next job with open(self._outFile, "w"): pass
[docs] def cleanup(self, force=False, **kwargs): # pylint: disable=unused-argument """Removes the output file of the job. If the output file is not specified explicitly nothing is written to the file with exception of the qsub job. Therefore empty output files will be deleted. :param force: forces the removal of the output file :type force: boolean """ # If output file file is explicitly specified it should not be removed during clean up. try: if force or (not self._writeToFile and not self._inJoblist and self.status.code == jobConsts.DONE) or os.stat(self.outFile).st_size == 0: os.remove(self.outFile) # file not found except (OSError, TypeError): pass
[docs] def waitUntilFinished(self, pollingInterval=jobConsts.POLLING_INTERVAL, maxPollingInterval=jobConsts.MAX_POLLING_INTERVAL, scalingPollingFactor=jobConsts.SCALING_POLLING_INTERVAL): """Waits until the job is finished. Polling interval to use in the beginning and maximal polling interval are set in jobMangement/jobManagementData.py """ if self._runInBackground: return try: while not self.updateStatus(): time.sleep(pollingInterval) if pollingInterval < maxPollingInterval: pollingInterval = min(scalingPollingFactor * pollingInterval, maxPollingInterval) except (SystemExit, KeyboardInterrupt): self.stop() self.cleanup() if self.status.code == jobConsts.RETRY: self.startJob() elif self.status.code != jobConsts.ERROR: with open(self.outFile, "a") as fileHandle: fileHandle.write(self.status.message + "\n")
def __str__(self): pattern = re.compile(r"<class '.*\.(.*)'>") jobType = re.match(pattern, str(self.__class__)).group(1) return f"job name: {self.name:>60}; job type: {jobType:>12}; job id:{self.id:>3}" def __repr__(self): return self.__str__()
[docs] def lookForSuccess(self, returnCode=0): """Controls the success of the currently finished job. If a success statement or a fail statement was defined the output file is searched for them. Sets the status of the job. :param returnCode: return code of the program called :type returnCode: integer :return: True if the job was successful :rtype: boolean """ jobWritesOutput = self._inJoblist or self._writeToFile successStatements, failStatements = self.__successStatements[:], self.__failStatements if returnCode and not self.__ignoreExitCode: self._writeStatus(jobConsts.ERROR, f"the executed program exited with nonzero return code ({returnCode})!") elif jobWritesOutput and (successStatements or failStatements): with open(self._outFile) as filehandler: for line in filehandler: if any((re.search(regExp, line) for regExp in failStatements)): self._writeStatus(jobConsts.ERROR, "fail status found in output file") break successStatements = [regExp for regExp in successStatements if not re.search(regExp, line)] else: if successStatements: self._writeStatus(jobConsts.ERROR, "success status ({0}) status not found in output file".format(", ".join(successStatements))) else: self._writeStatus(jobConsts.DONE, "terminated successfully!") else: self._writeStatus(jobConsts.DONE, "terminated successfully!") return self.status.code == jobConsts.DONE