Source code for mojo.jobManagement.jobs.jobList

"""
JobList
=======
Class to set up a job management system for among themself dependent jobs.
In addition you can put a Job list itself as job into another job list. After a successful execution of the job list
you can get a list of the jobs which were successful, failed and were not started because of failed parent jobs by
accessing joblist.successfulJobs, joblist.failedJobs and joblist.jobs.values().

.. attention::

    Declaring dependencies between jobs in different job lists will generate a error!
"""

import datetime
import functools
import itertools
import signal
import sys
import time

from ...bricabrac.fileIO import Printer
from ..graph.graph import Graph
from ...jobManagement import jobManagementData as jobConsts
from .abstractJob import AbstractJob
from .abstractJob import Status
from ..management.resources import Resources
from ..management.clusterInfo import ClusterInfo
from ..jobObserver import JobEvent, JobObserver
from .jobGrouping import JobGrouper

_STATUS_STRING = "{{0:.<{length}}}{{1}} ({{2}}, id:{{3}})".format(length=21 if sys.stdout.isatty() else 15)  # test for color support in the terminal


[docs]class JobList(AbstractJob, JobObserver): """Class to set up a system of among themselves dependent jobs. JobList uses a instance of graph to control the dependencies between the jobs and a instance of resources to manage the nodes available. :param nNodesAvailable: number of nodes available for the execution of the job list :param nProcsAvailable: number of processors to use on the master :param nTecplotLicenses: number of tecplot licenses needed by the job :param abortByError: flag if the whole list should became stopped when one job fails :param name: a name for the job list :param resourcesDict: a dictionary containing resources available in the job list (key: resource, value: quantity of the resource) :param verbosity: verbosity level of the job list :param usedResources: resources the job list consumes itself when processing :param retries: number of times the job is retried when failing :param deactivateJobGrouping: disable running several small jobs simultaneously on cluster nodes :type nNodesAvailable: integer :type nProcsAvailable: integer :type nTecplotLicenses: integer :type abortByError: boolean :type name: string :type resourcesDict: dictionary :type verbosity: integer :type usedResources: dictionary :type retries: integer :type deactivateJobGrouping: boolean Inherited from AbstractJob:: start(self) _writeStatus(self, status, message="") """
[docs] @staticmethod def sigterm_handler(_signo, _stack_frame): """Handle a TERM signal as SystemExit exception. """ # Raises SystemExit with signal number sys.exit(_signo)
def __init__(self, nNodesAvailable=None, nProcsAvailable=None, nTecplotLicenses=None, abortByError=False, name="anonymousJoblist", verbosity=Printer.Verbosity.DEFAULT, resourcesDict=None, usedResources=None, additionalProcessStatus=False, retries=0, deactivateJobGrouping=True): """Initializes a new instance of JobList. :param nNodesAvailable: number of nodes available for the execution of the job list :param nProcsAvailable: number of processors to use on the master :param nTecplotLicenses: number of tecplot licenses needed by the job :param abortByError: flag if the whole list should became stopped when one job fails :param name: a name for the job list :param resourcesDict: a dictionary containing resources available in the job list (key: resource, value: quantity of the resource) :param verbosity; verbosity level of the job list :param usedResources: resources the job list consumes itself when processing :param retries: number of times the job is retried when failing :param deactivateJobGrouping: disable running several small jobs simultaneously on cluster nodes :type nNodesAvailable: integer :type nProcsAvailable: integer :type nTecplotLicenses: integer :type abortByError: boolean :type name: string :type resourcesDict: dictionary :type verbosity: Printer.Verbosity :type usedResources: dictionary :type retries: integer :type deactivateJobGrouping: boolean """ super().__init__(name=name, resourcesDict=usedResources, retries=retries) self.__verbosity = verbosity self.__jobs = dict() self.successfulJobs = list() self.failedJobs = list() self.__dependencies = Graph(self.id) self.__abortByError = abortByError resources = Resources.fromParameters(nProcs=nProcsAvailable, nNodes=nNodesAvailable, nTecplotLicenses=nTecplotLicenses) resources.update(resourcesDict or {}) self._nProcsAvailable = resources.maxProcs self.__priorityCalculated = False self.__additionalProcessStatus = additionalProcessStatus self._velocity = 1 # processes used * s per processes available * s if ClusterInfo.onValidCluster() and not deactivateJobGrouping: self._jobGrouper = JobGrouper() else: self._jobGrouper = None @property def jobs(self): return self.__jobs @property def notStartedJobs(self): return (job for job in self.__jobs.values() if job.status.code == jobConsts.WAITING) @property def runningJobs(self): return (job for job in self.__jobs.values() if job.status.code == jobConsts.RUNNING) @property def allJobs(self): """ Returns all jobs in this JobList Warning: Some jobs can be included multiple times in the returned iterator """ return itertools.chain(self.jobs.values(), self.failedJobs, self.successfulJobs)
[docs] def nJobsOfStatus(self, statusList=None): """Returns if the jobs status is in the list of status. Overwrites method in AbstractJob. :param statusList: list of valid status or None :type statusList: Optional[iterable] :return: number of jobs of this status :rtype: int """ nJobs = 0 # We need to iterate to get correct values for joblists in this joblist for job in self.allJobs: nJobs += job.nJobsOfStatus(statusList) return nJobs
@property def dependencies(self): return self.__dependencies @property def verbosity(self): return self.__verbosity @verbosity.setter def verbosity(self, verbosity): self.__verbosity = verbosity
[docs] def calculatePriorities(self): """Caluclate the weights of the job lists dependency graph. """ self.__dependencies.calculatePriority() self.__priorityCalculated = True
[docs] def addJob(self, job, parents=None, weight=None): """Adds a new job to the job list and builds the dependencies :param job: the job to add to the job list :param parents: list of the parentsIds of the job :param weight: weight of the job in processes * seconds :type job: instance of a class derived from AbstractJob :type parents: list of int :type weight: int """ self.__priorityCalculated = False parents = parents or list() if not job.resourcesAvailable(): raise RuntimeError(f"The job ({job.name}) you are trying to add to the job list \ needs more or other resources than are available on this system!") if weight is not None: job.weight = weight self.weight += job.weight job.notifier.registerObserver(self, JobEvent.ANY_JOB_EVENT) if isinstance(job, JobList): self.__dependencies.addGraph(job.dependencies, parents) else: self.__dependencies.addJob(job.id, parents, job.weight) self.__jobs[job.id] = job job._inJoblist = True job._parentJoblist = self
[docs] def update(self, subject, event): self.notifier.forwardNotification(subject, event)
def _writeProgress(self): nJobsWaiting = self.nJobsOfStatus((jobConsts.WAITING,)) nJobsRunning = self.nJobsOfStatus((jobConsts.RUNNING,)) nJobsSuccessful = self.nJobsOfStatus((jobConsts.DONE, )) nJobsFailed = self.nJobsOfStatus((jobConsts.ERROR, )) nJobsSum = nJobsWaiting + nJobsRunning + nJobsSuccessful + nJobsFailed weightDone = functools.reduce(lambda sum, job: sum + job.weight, itertools.chain(self.successfulJobs, self.failedJobs), 0) now = time.time() weightRemaining = functools.reduce(lambda sum, job: sum + job.getWeightRemaining(self._velocity), itertools.chain(self.notStartedJobs, self.runningJobs), 0) timeRemaining = weightRemaining / self._nProcsAvailable * (1 / self._velocity) self._writeStatus(jobConsts.PROGRESS, "{:.2f}% - Waiting:{:5}; Running:{:3}; Successful:{:5}; Failed:{:3}; Sum:{:5} - {} / {}".format( (1.0 - weightRemaining / (weightRemaining + weightDone)) * 100.0 if (weightRemaining + weightDone) else 0, nJobsWaiting, nJobsRunning, nJobsSuccessful, nJobsFailed, nJobsSum, datetime.timedelta(seconds=int(now - self._startTime)), datetime.timedelta(seconds=int(now - self._startTime + timeRemaining))), self._overwritePreviousLine)
[docs] def runJoblist(self, printOverview=False): """Executes a job list. * While not terminated jobs or jobs without dead parent in the list: * when jobs ready to start exist and enough nodes exist start new jobs * print the status of all running jobs into the log file and to sys.stdout * if a job has the status done or error free the nodes and resolve the dependencies """ # Write content of buffer to the terminal before running the job list. sys.stdout.flush() self._tryCount += 1 # catch TERM signals in SystemExit of try-except signal.signal(signal.SIGTERM, self.sigterm_handler) curSleepTime = self.__adaptSleepTime(0, True) # initial sleep time is the minimum sleep time if not self.__priorityCalculated: self.calculatePriorities() Printer.verbosePrint("\nexecution of the job list:\n==========================", Printer.Verbosity.JENKINS) # self.__dependencies.sanityTest() # does not do anything at the moment self._startTime = time.time() self._endTime = None try: # main loop while self.__dependencies.getReadyJobs() and not (self.__abortByError and self.status.code == jobConsts.ERROR): self.__startNewJobs() # start new jobs? time.sleep(curSleepTime) # to decrease the processor load self.__handleStatus() # update statuses somethingHappened = self.__testforFinishedJobs() # job done? curSleepTime = self.__adaptSleepTime(curSleepTime, somethingHappened) if somethingHappened or sys.stdout.isatty(): # 'sys.stdout.isatty()' test if we are in an interactive terminal -> overwrite lines self._writeProgress() except (SystemExit, KeyboardInterrupt): self._writeStatus(jobConsts.ERROR, "job list aborted by KeyboardInterrupt or SystemExit!") self.stop() raise # set own status if not self.failedJobs and not list(self.notStartedJobs): self._writeStatus(jobConsts.DONE, "all jobs terminated successfully!") Printer.verbosePrint("\nAll jobs terminated successfully!", Printer.Verbosity.JENKINS) elif self.failedJobs: self._writeStatus(jobConsts.ERROR, "at least one job terminated not successfully!") Printer.verbosePrint("\nAt least one job terminated not successfully!", Printer.Verbosity.JENKINS) elif list(self.notStartedJobs): self._writeStatus(jobConsts.ERROR, "at least one job was not started!") Printer.verbosePrint("\nAt least one job was not started!", Printer.Verbosity.JENKINS) # overview over results if printOverview: Printer.verbosePrint(str(self), Printer.Verbosity.DEBUG) Printer.verbosePrint("", Printer.Verbosity.JENKINS)
[docs] def runJob(self): """Method to fulfill the interface set by AbstractJob. """ self.__runAsJob()
def __adaptSleepTime(self, curSleepTime, somethingHappended, minSleepTime=jobConsts.MIN_SLEEP_TIME, maxSleepTime=jobConsts.MAX_SLEEP_TIME, adaptionStep=jobConsts.ADAPT_SLEEP_TIME_STEP): """Adapts the sleep time between asking for finished jobs. :param curSleepTime: :param somethingHappened: :param minSleepTime: :param maxSleepTime: :param adaptionStep: :type curSleepTime: :type somethingHappened: :type minSleepTime: :type maxSleepTime: :type adaptionStep: :return: :rtype: """ if somethingHappended: return minSleepTime else: return min(curSleepTime + adaptionStep, maxSleepTime) def __recursivelyResetStatus(self): """Intended for use when retry is triggered """ self.dependencies.reset() for job in itertools.chain(self.failedJobs, self.successfulJobs, list(self.__jobs.values())): job.status = Status(jobConsts.WAITING, "") job._tryCount = 0 self.__jobs[job.id] = job if isinstance(job, JobList): job.__recursivelyResetStatus() self.failedJobs.clear() self.successfulJobs.clear() def __runAsJob(self): """Executes the job list as job. """ # start new jobs? self.__startNewJobs() # update statuses self.__handleStatus() # job done? self.__testforFinishedJobs() # retry? if self.failedJobs and self._tryCount <= self._retries: self._writeStatus(jobConsts.RETRY, f"failing job, going for a retry (try {self._tryCount + 1}/{self._retries + 1})") self.stop() self.__recursivelyResetStatus() # end job list elif not self.__dependencies.getReadyJobs() or (self.__abortByError and self.failedJobs): if not self.failedJobs: self._writeStatus(jobConsts.DONE, f"all jobs in job list {self.id} terminated successfully") else: self._writeStatus(jobConsts.ERROR, f"at least one job in job list {self.id} failed") def __startNewJobs(self): """Looks for ready jobs in the dependencies, allocates the resources needed and starts the jobs. """ # jobs ready w.r.t. dependency graph jobs = [self.__jobs[jobId] for jobId in self.__dependencies.getReadyJobs()] # jobs ready w.r.t. to status and resources readyJobs = [job for job in jobs if job.status.code in [jobConsts.WAITING, jobConsts.RETRY] and job.resourcesAvailable()] # try to lump small jobs together if self._jobGrouper is not None: singleJobs, groupedJobs = self._jobGrouper.groupJobs(readyJobs) # jobs for grouped jobs are new and need to be added for job in groupedJobs: self.addJob(job) readyJobs = singleJobs + groupedJobs for job in readyJobs: # check again because of possibly consumed resources if job.resourcesAvailable(): job.allocateResources() job.startJob() def __handleStatus(self): """Prints or passes the status of all current running jobs. """ for job in self.__jobs.values(): if job.status.code == jobConsts.RUNNING: job.updateStatus() if job.status.code in (jobConsts.ERROR, jobConsts.RETRY, jobConsts.RUNNING): self.__processJobStatus(job) def __processJobStatus(self, job): """Prints the jobs status or puts it in the communicationQueue. :param job: the job those status will be processed :type job: Job instance """ printedSomething = False while True: try: status = job.statusQueue.pop(0) except IndexError: break else: if not self._inJoblist: if self.__additionalProcessStatus: printedSomething = Printer.verbosePrint( _STATUS_STRING.format(jobConsts.REPR_STATUS[status.code], status.message, job.name, job.id), Printer.Verbosity.JENKINS) else: if status.code == jobConsts.RUNNING or status.code == jobConsts.DONE: pass else: prefix = jobConsts.DELETE_PREVIOUS_LINE if self._overwritePreviousLine else "" printedSomething = Printer.verbosePrint( prefix + _STATUS_STRING.format(jobConsts.REPR_STATUS[status.code], status.message, job.name, job.id), Printer.Verbosity.JENKINS) else: if status.code == jobConsts.DONE: # print finalization of _every_ single job in list only at DEBUG verbosity level (= 3) if self.__verbosity > Printer.Verbosity.DEBUG: self._writeStatus(jobConsts.RUNNING, f"job in job list done! ({job.name}, id:{job.id})") else: # status.code in (RUNNING, ERROR, RETRY) self._writeStatus(jobConsts.RUNNING, status.message + f" ({job.name}, id:{job.id})") self._overwritePreviousLine = self._overwritePreviousLine and not printedSomething def _calcVelocity(self): velocity = self._velocity weightDone = functools.reduce(lambda sum, job: sum + job.weight, self.successfulJobs, 0) timePassed = time.time() - self._startTime if weightDone > 0: velocity = weightDone / (timePassed * self._nProcsAvailable) return velocity def __testforFinishedJobs(self): """Tests if any job has finished and frees the resources which belong to finished jobs. """ somethingHappened = False for job in list(self.__jobs.values()): if job.status.code not in (jobConsts.DONE, jobConsts.ERROR, jobConsts.RETRY): continue else: somethingHappened = True if job.status.code == jobConsts.RETRY: job.startJob() continue elif job.status.code == jobConsts.ERROR: self.failedJobs.append(job) self.__processJobStatus(job) # to get all the job status if self.__abortByError: self.__stopByJob(job) else: # status.code == DONE self.successfulJobs.append(job) self.__processJobStatus(job) # to get all the job status self._velocity = self._calcVelocity() job.freeResources() self.__dependencies.jobDone(job.id, job.status.code) del self.__jobs[job.id] return somethingHappened
[docs] def updateStatus(self): """Makes a iteration over all jobs, execute new jobs and read their statuses. """ self.__runAsJob()
[docs] @staticmethod def viewDependencies(currentJobList): """Prints all dependencies of the current JobList by recursively walking through the job list. :param currentJobList: main job list :type currentJobList: JobList """ currentJobList.dependencies.printGraph() for job in currentJobList.jobs.values(): if isinstance(job, JobList): JobList.viewDependencies(job)
[docs] def viewJobs(self): """Prints a list of the job list and the associated dependency graph. """ if not self.__priorityCalculated: self.calculatePriorities() Printer.verbosePrint("jobs:\n=====", printLevel=Printer.Verbosity.ALWAYS) for job in self.__jobs.values(): Printer.verbosePrint(job, printLevel=Printer.Verbosity.ALWAYS) Printer.verbosePrint("\ndependencies:\n=============", printLevel=Printer.Verbosity.ALWAYS) JobList.viewDependencies(self)
def __stopByJob(self, job): """Stops all at this time running jobs in the job list by calling their stop-method. Called when a job fails and abort by error is set. :param job: job which has caused the stop :type job: instance of a job """ self.stop() self._writeStatus(jobConsts.ERROR, f"job list {self.id} aborted because of error in job {job.name}(id:{job.id})")
[docs] def stop(self): """Stops all currently running jobs. Called by a parent job list or through a keyboard interrupt or system exit. """ for job in self.__jobs.values(): if job.status.code == jobConsts.RUNNING: job.stop()
[docs] def cleanup(self, force=False, **kwargs): """Remove created scripts, output and error files :param force: flag whether cleanup is enforced :type force: bool :param removeScript: flag whether files should be removed :type removeScript: bool """ for job in self.successfulJobs + self.failedJobs: job.cleanup(force, **kwargs)
def setGroup(self, group): self._group = group for job in self.allJobs: job.setGroup(group) def __str__(self): string = AbstractJob.__str__(self) + "\n" if self.jobs: string += " " * 4 + "not started:\n" for job in sorted(self.notStartedJobs, key=lambda x: x.id): string += " " * 8 + str(job) + "\n" if self.successfulJobs: string += " " * 4 + "successful finished:\n" for job in sorted(self.successfulJobs, key=lambda x: x.id): string += " " * 8 + str(job) + "\n" if self.failedJobs: string += " " * 4 + "failed:\n" for job in sorted(self.failedJobs, key=lambda x: x.id): string += " " * 8 + str(job) + "\n" return string.rstrip("\n") def __repr__(self): return self.__str__()