"""
AbstractJob
===========
AbstractJob offers a class to inherit from for every type of job the job management is intended to
work with. It has the abstract methods runJob() and stop() implemented to be overwritten in every
derived class. AbstractJob inherits from Process and therefore can be started in a own process.
Furthermore five int-constants for the statuses and a dictionary to print a string instead of the
value of the constant are offered.
Required classes and methods::
from abc import ABCMeta, abstractmethod
import collections
import glob
import os
import re
import sys
import time
from jobManagement.management.idGenerator import IdGenerator
"""
from abc import ABCMeta, abstractmethod
import collections
import glob
import os
import re
import sys
import time
from ...bricabrac.fileIO import Printer
from ...jobManagement import jobManagementData as jobConsts
from ..management.idGenerator import IdGenerator
from ..management.resources import PROCESSORS, NODES, TECPLOT_LICENSES, Resources
from ..jobObserver import JobNotifier, JobEvent
_STATUS_STRING = "{{0:.<{length}}}{{1}}".format(length=21 if sys.stdout.isatty() else 15)
class Status(collections.namedtuple("Status", ("code", "message"))):
"""Class which inherits a named tuple to set default values at initiation.
"""
def __new__(cls, code=jobConsts.WAITING, message=str()):
"""Creates a new instance and sets code to WAITING and message to an empty string by default.
:param code: code of the current status
:param message: message of the current status
:type code: int constant
:type message: string
"""
return super(Status, cls).__new__(cls, code, message)
[docs]class AbstractJob(object, metaclass=ABCMeta):
"""Class every job should inherit from. Offers several methods and properties to
communicate between the processes.
It gets a id from IdGenerator, a queue to send the statuses to the main-thread, a name
and a path to a log file(to a default log file if no log file is given).
:param name: name for the new job
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param outFile: path to the file the output will be written in
:param outputDir: directory the output will be written in
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param workingDir: directory the job will be processed in
:param runInBackground: If true the job will run in the background, when executed as single job
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource(valid dictionary key), value: quantity of the resource)
:param weight: the weight (seconds to run times number of processes) of the job
:type name: string
:tpye nTecplotLicenses: integer
:type outFile: string
:type outputDir: string
:type successStatements: list
:type failStatements: list
:type workingDir: string
:type runInBackground: boolean
:type ignoreExitCode: boolean
:type resourcesDict: dictionary
:type weight: int
"""
__outfileExtension = '.out'
__knownNames = dict()
_jsonAttributes = ("name", "workingDir", "nProcs", "outputFile")
def __init__(self, name=None, nTecplotLicenses=0, outFile=None, outputDir=None, successStatements=None,
failStatements=None, workingDir=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, weight=0, retries=0, group=None):
"""Initializes the instance variables.
:param name: name for the new job
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param outFile: path to the file the output will be written in
:param outputDir: directory the output will be written in
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param workingDir: directory the job will be processed in
:param runInBackground: If true the job will run in the background, when executed as single job
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param weight: the weight (seconds to run times number of processes) of the job
:param retries: number of times the job is retried when failing
:param group: name of the group that contains this job, if any
:type name: string
:tpye nTecplotLicenses: integer
:type outFile: string
:type outputDir: string
:type successStatements: list
:type failStatements: list
:type workingDir: string
:type runInBackground: boolean
:type ignoreExitCode: boolean
:type resourcesDict: dictionary
:type weight: int
:type retries: int
:type group: string
"""
self.__id = IdGenerator().getId()
outputDir = outputDir or os.curdir
self._name = self.__generateUniqueJobName(name or "anonymous")
self._resources = {PROCESSORS: 0, NODES: 0, TECPLOT_LICENSES: nTecplotLicenses}
self._resources.update(resourcesDict or dict())
self._nProcs = 0
self._inJoblist = False
self._parentJoblist = None
self.__successStatements = successStatements or list()
self.__failStatements = failStatements or list()
self._workingDir = os.path.abspath(workingDir) if workingDir else os.path.abspath(".")
self._runInBackground = runInBackground
self.__ignoreExitCode = ignoreExitCode
self.weight = weight
self._notifier = JobNotifier(self)
self._group = group
# Since there is always an output file use this variable to determine whether output should be written to file.
self._writeToFile = (outFile is not None)
self._outFile = outFile or self._name + self.__outfileExtension
if outputDir != os.curdir:
self._outFile = os.path.abspath(os.path.join(outputDir, self._outFile))
else:
self._outFile = os.path.abspath(os.path.join(self._workingDir, self._outFile))
self.__status = Status()
self.__statusQueue = list()
self._startTime = None
self._endTime = None
self.__overwritePreviousLine = False
self._retries = retries
self._tryCount = 0
# indicates that, in general, job may not run as part of a group on a cluster node
self.allowJobGrouping = False
@property
def name(self):
return self._name
@property
def group(self):
return self._group
@property
def notifier(self):
return self._notifier
@property
def jobListName(self):
if self._parentJoblist:
return self._parentJoblist.name
else:
return ""
@property
def _overwritePreviousLine(self):
return self.__overwritePreviousLine
@_overwritePreviousLine.setter
def _overwritePreviousLine(self, overWritePreviousLine):
self.__overwritePreviousLine = overWritePreviousLine and sys.stdout.isatty()
@property
def nProcs(self):
return self._nProcs
def getOutFile(self):
return os.path.abspath(self._outFile)
def setOutFile(self, newPath):
self._outFile = newPath
def setGroup(self, group):
self._group = group
outFile = property(getOutFile, setOutFile)
outputFile = outFile
outfile = outFile # DEPRECATED
@property
def id(self):
return self.__id
def getStatus(self):
return self.__status
def setStatus(self, newStatus):
self.__status = newStatus
status = property(getStatus, setStatus)
@property
def workingDir(self):
return self._workingDir
@property
def statusQueue(self):
return self.__statusQueue
@property
def startTime(self):
return self._startTime
@property
def runtime(self):
if self._startTime is None or self._endTime is None:
return 0
else:
return self._endTime - self._startTime
[docs] @abstractmethod
def runJob(self):
"""Abstract method runJob, to be overwritten in subclasses.
Shoult initialize and start the current job.
"""
[docs] @abstractmethod
def updateStatus(self):
"""Abstract method updateStatus, to be overwritten in subclasses.
Updates the job status and returns true if the status has changed.
:return: returns true if the status has changed
:rtype: boolean
"""
[docs] @abstractmethod
def stop(self):
"""Abstract method stop, to be overwritten in subclasses.
Should stop the current job.
"""
def getWeightRemaining(self, velocity=1):
weightRemaining = self.weight
if self.startTime is not None:
weightRemaining = max(0, self.weight - ((time.time() - self.startTime) * self.nProcs * velocity))
return weightRemaining
[docs] def nJobsOfStatus(self, statusList=None):
"""Returns if the jobs status is in the list of status.
:param statusList: list of valid status or None
:type statusList: list or None
:return: nuber of jobs of this status (0 or 1)
:rtype: int
"""
nJobs = 0
if statusList is None or self.status.code in statusList:
nJobs = 1
return nJobs
[docs] def resourcesAvailable(self):
"""Tests if the resources for the job are available.
:return: True if the resources for the job are available
:rtype: boolean
"""
return Resources().resourcesAvailable(self._resources)
[docs] def allocateResources(self):
"""Allocates the resources needed for the job.
"""
Resources().allocateResources(self._resources)
[docs] def freeResources(self):
"""Frees the resources used by this job.
"""
Resources().releaseResources(self._resources)
[docs] def startJob(self):
"""Set the own status to (RUNNING, ) and calls runJob.
"""
if self.status.code == jobConsts.DONE: # allows to run the same job multiple times
self._tryCount = 0
self._tryCount += 1
self._startTime = time.time()
self._endTime = None
self.notifier.notifyObservers(JobEvent.JOB_STARTED_EVENT)
self._writeStatus(jobConsts.RUNNING, f"{self.name} started!")
try:
self.runJob()
except Exception as outerException: # pylint: disable=broad-except
try:
lines = list()
lines.append("The job failed with the following parameters:\n" + "=" * 45 + "\n")
lines.append(f"Name: {self.name}\n")
if hasattr(self, "_executable"):
lines.append(f"Executable: {self._executable}\n")
else:
lines.append(f"Method: {self._method}\n")
lines.append(f"Working directory: {self._workingDir}\n")
lines.append(f"Error message: {outerException!s}\n")
with open(self._outFile, "a") as fileHandle:
fileHandle.writelines(lines)
except (IOError, AttributeError) as e:
self._writeStatus(jobConsts.ERROR, f"Failed to write output file '{self._outFile}': '{e!s}")
finally:
self._writeStatus(jobConsts.ERROR, f"Job {self.name} failed with message: '{outerException!s}'. Output file: {self.outFile}")
def _writeStatus(self, status, message=str(), overwritePreviousLine=False):
"""Set and print status. Either prints the status to the console or puts it to the status queue.
"""
printedSomething = False
if status in [jobConsts.DONE, jobConsts.ERROR, jobConsts.RETRY] and self._inJoblist:
self._endTime = time.time()
# Notify Observers
if status == jobConsts.ERROR:
self.notifier.notifyObservers(JobEvent.JOB_FAILED_EVENT)
elif status == jobConsts.DONE:
self.notifier.notifyObservers(JobEvent.JOB_SUCCEEDED_EVENT)
if status == jobConsts.ERROR:
if self._tryCount <= self._retries:
status = jobConsts.RETRY
message = f"{message}; Retrying (try {self._tryCount + 1}/{self._retries + 1})"
elif self._retries > 0:
message = f"{message}; Maximum number of tries reached ({self._retries + 1 })."
self.status = Status(status, message)
if status in [jobConsts.DONE, jobConsts.ERROR, jobConsts.RETRY]:
self._endTime = time.time()
if not self._inJoblist:
prefix = jobConsts.DELETE_PREVIOUS_LINE if overwritePreviousLine else ""
printedSomething = Printer.verbosePrint(prefix + _STATUS_STRING.format(jobConsts.REPR_STATUS[status], message),
printLevel=Printer.Verbosity.JENKINS)
else:
self.__statusQueue.append(self.status)
self._overwritePreviousLine = (status == jobConsts.PROGRESS) and printedSomething
return printedSomething
def __generateUniqueJobName(self, name):
"""Method to generate a unique job name. Needed, when jobs without name but the same executable
are in one job list to generate unique output files.
:param name: name of the job
:type name: string
:return: a unique jobname; jobname + number
:rtype: string
"""
if name not in self.__knownNames:
self.__knownNames[name] = 1
else:
self.__knownNames[name] += 1
while True:
fileName = f"{name}.{self.__knownNames[name]:03d}"
if not glob.glob(f"{fileName}*"):
break
self.__knownNames[name] += 1
return fileName
[docs] def getJobDictForJSON(self, jobsListName):
"""Generates a dictionary with basic parameters to launch the job.
:param jobsListName: name of job list
:type jobsListName: str
:return: launch parameters
:rtype: dict
"""
params = collections.OrderedDict()
params["jobList"] = jobsListName
for attr in self._jsonAttributes:
if attr == "args":
params[attr] = " ".join(getattr(self, attr))
else:
params[attr] = getattr(self, attr)
return params
[docs] def prepareOutFile(self):
"""Generates the path to the output file and the output file itself.
"""
if os.path.exists(self.outFile):
os.remove(self.outFile)
elif not os.path.exists(os.path.dirname(self.outFile)) and os.path.dirname(self.outFile):
os.makedirs(os.path.dirname(self.outFile))
# To create the output file and do not overwrite it with the next job
with open(self._outFile, "w"):
pass
[docs] def cleanup(self, force=False, **kwargs): # pylint: disable=unused-argument
"""Removes the output file of the job.
If the output file is not specified explicitly nothing is written to the file with exception of the qsub job.
Therefore empty output files will be deleted.
:param force: forces the removal of the output file
:type force: boolean
"""
# If output file file is explicitly specified it should not be removed during clean up.
try:
if force or (not self._writeToFile and not self._inJoblist and self.status.code == jobConsts.DONE) or os.stat(self.outFile).st_size == 0:
os.remove(self.outFile)
# file not found
except (OSError, TypeError):
pass
[docs] def waitUntilFinished(self, pollingInterval=jobConsts.POLLING_INTERVAL, maxPollingInterval=jobConsts.MAX_POLLING_INTERVAL,
scalingPollingFactor=jobConsts.SCALING_POLLING_INTERVAL):
"""Waits until the job is finished.
Polling interval to use in the beginning and maximal polling interval are set in jobMangement/jobManagementData.py
"""
if self._runInBackground:
return
try:
while not self.updateStatus():
time.sleep(pollingInterval)
if pollingInterval < maxPollingInterval:
pollingInterval = min(scalingPollingFactor * pollingInterval, maxPollingInterval)
except (SystemExit, KeyboardInterrupt):
self.stop()
self.cleanup()
if self.status.code == jobConsts.RETRY:
self.startJob()
elif self.status.code != jobConsts.ERROR:
with open(self.outFile, "a") as fileHandle:
fileHandle.write(self.status.message + "\n")
def __str__(self):
pattern = re.compile(r"<class '.*\.(.*)'>")
jobType = re.match(pattern, str(self.__class__)).group(1)
return f"job name: {self.name:>60}; job type: {jobType:>12}; job id:{self.id:>3}"
def __repr__(self):
return self.__str__()
[docs] def lookForSuccess(self, returnCode=0):
"""Controls the success of the currently finished job. If a success statement or a fail statement was defined
the output file is searched for them. Sets the status of the job.
:param returnCode: return code of the program called
:type returnCode: integer
:return: True if the job was successful
:rtype: boolean
"""
jobWritesOutput = self._inJoblist or self._writeToFile
successStatements, failStatements = self.__successStatements[:], self.__failStatements
if returnCode and not self.__ignoreExitCode:
self._writeStatus(jobConsts.ERROR, f"the executed program exited with nonzero return code ({returnCode})!")
elif jobWritesOutput and (successStatements or failStatements):
with open(self._outFile) as filehandler:
for line in filehandler:
if any((re.search(regExp, line) for regExp in failStatements)):
self._writeStatus(jobConsts.ERROR, "fail status found in output file")
break
successStatements = [regExp for regExp in successStatements if not re.search(regExp, line)]
else:
if successStatements:
self._writeStatus(jobConsts.ERROR, "success status ({0}) status not found in output file".format(", ".join(successStatements)))
else:
self._writeStatus(jobConsts.DONE, "terminated successfully!")
else:
self._writeStatus(jobConsts.DONE, "terminated successfully!")
return self.status.code == jobConsts.DONE