"""
JobList
=======
Class to set up a job management system for among themself dependent jobs.
In addition you can put a Job list itself as job into another job list. After a successful execution of the job list
you can get a list of the jobs which were successful, failed and were not started because of failed parent jobs by
accessing joblist.successfulJobs, joblist.failedJobs and joblist.jobs.values().
.. attention::
Declaring dependencies between jobs in different job lists will generate a error!
"""
import datetime
import functools
import itertools
import signal
import sys
import time
from ...bricabrac.fileIO import Printer
from ..graph.graph import Graph
from ...jobManagement import jobManagementData as jobConsts
from .abstractJob import AbstractJob
from .abstractJob import Status
from ..management.resources import Resources
from ..management.clusterInfo import ClusterInfo
from ..jobObserver import JobEvent, JobObserver
from .jobGrouping import JobGrouper
_STATUS_STRING = "{{0:.<{length}}}{{1}} ({{2}}, id:{{3}})".format(length=21 if sys.stdout.isatty() else 15) # test for color support in the terminal
[docs]class JobList(AbstractJob, JobObserver):
"""Class to set up a system of among themselves dependent jobs.
JobList uses a instance of graph to control the dependencies between the jobs and
a instance of resources to manage the nodes available.
:param nNodesAvailable: number of nodes available for the execution of the job list
:param nProcsAvailable: number of processors to use on the master
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param abortByError: flag if the whole list should became stopped when one job fails
:param name: a name for the job list
:param resourcesDict: a dictionary containing resources available in the job list (key: resource, value: quantity of the resource)
:param verbosity: verbosity level of the job list
:param usedResources: resources the job list consumes itself when processing
:param retries: number of times the job is retried when failing
:param deactivateJobGrouping: disable running several small jobs simultaneously on cluster nodes
:type nNodesAvailable: integer
:type nProcsAvailable: integer
:type nTecplotLicenses: integer
:type abortByError: boolean
:type name: string
:type resourcesDict: dictionary
:type verbosity: integer
:type usedResources: dictionary
:type retries: integer
:type deactivateJobGrouping: boolean
Inherited from AbstractJob::
start(self)
_writeStatus(self, status, message="")
"""
[docs] @staticmethod
def sigterm_handler(_signo, _stack_frame):
"""Handle a TERM signal as SystemExit exception.
"""
# Raises SystemExit with signal number
sys.exit(_signo)
def __init__(self, nNodesAvailable=None, nProcsAvailable=None, nTecplotLicenses=None, abortByError=False, name="anonymousJoblist",
verbosity=Printer.Verbosity.DEFAULT, resourcesDict=None, usedResources=None, additionalProcessStatus=False, retries=0,
deactivateJobGrouping=True):
"""Initializes a new instance of JobList.
:param nNodesAvailable: number of nodes available for the execution of the job list
:param nProcsAvailable: number of processors to use on the master
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param abortByError: flag if the whole list should became stopped when one job fails
:param name: a name for the job list
:param resourcesDict: a dictionary containing resources available in the job list (key: resource, value: quantity of the resource)
:param verbosity; verbosity level of the job list
:param usedResources: resources the job list consumes itself when processing
:param retries: number of times the job is retried when failing
:param deactivateJobGrouping: disable running several small jobs simultaneously on cluster nodes
:type nNodesAvailable: integer
:type nProcsAvailable: integer
:type nTecplotLicenses: integer
:type abortByError: boolean
:type name: string
:type resourcesDict: dictionary
:type verbosity: Printer.Verbosity
:type usedResources: dictionary
:type retries: integer
:type deactivateJobGrouping: boolean
"""
super().__init__(name=name, resourcesDict=usedResources, retries=retries)
self.__verbosity = verbosity
self.__jobs = dict()
self.successfulJobs = list()
self.failedJobs = list()
self.__dependencies = Graph(self.id)
self.__abortByError = abortByError
resources = Resources.fromParameters(nProcs=nProcsAvailable, nNodes=nNodesAvailable, nTecplotLicenses=nTecplotLicenses)
resources.update(resourcesDict or {})
self._nProcsAvailable = resources.maxProcs
self.__priorityCalculated = False
self.__additionalProcessStatus = additionalProcessStatus
self._velocity = 1 # processes used * s per processes available * s
if ClusterInfo.onValidCluster() and not deactivateJobGrouping:
self._jobGrouper = JobGrouper()
else:
self._jobGrouper = None
@property
def jobs(self):
return self.__jobs
@property
def notStartedJobs(self):
return (job for job in self.__jobs.values() if job.status.code == jobConsts.WAITING)
@property
def runningJobs(self):
return (job for job in self.__jobs.values() if job.status.code == jobConsts.RUNNING)
@property
def allJobs(self):
"""
Returns all jobs in this JobList
Warning: Some jobs can be included multiple times in the returned iterator
"""
return itertools.chain(self.jobs.values(), self.failedJobs, self.successfulJobs)
[docs] def nJobsOfStatus(self, statusList=None):
"""Returns if the jobs status is in the list of status.
Overwrites method in AbstractJob.
:param statusList: list of valid status or None
:type statusList: Optional[iterable]
:return: number of jobs of this status
:rtype: int
"""
nJobs = 0
# We need to iterate to get correct values for joblists in this joblist
for job in self.allJobs:
nJobs += job.nJobsOfStatus(statusList)
return nJobs
@property
def dependencies(self):
return self.__dependencies
@property
def verbosity(self):
return self.__verbosity
@verbosity.setter
def verbosity(self, verbosity):
self.__verbosity = verbosity
[docs] def calculatePriorities(self):
"""Caluclate the weights of the job lists dependency graph.
"""
self.__dependencies.calculatePriority()
self.__priorityCalculated = True
[docs] def addJob(self, job, parents=None, weight=None):
"""Adds a new job to the job list and builds the dependencies
:param job: the job to add to the job list
:param parents: list of the parentsIds of the job
:param weight: weight of the job in processes * seconds
:type job: instance of a class derived from AbstractJob
:type parents: list of int
:type weight: int
"""
self.__priorityCalculated = False
parents = parents or list()
if not job.resourcesAvailable():
raise RuntimeError(f"The job ({job.name}) you are trying to add to the job list \
needs more or other resources than are available on this system!")
if weight is not None:
job.weight = weight
self.weight += job.weight
job.notifier.registerObserver(self, JobEvent.ANY_JOB_EVENT)
if isinstance(job, JobList):
self.__dependencies.addGraph(job.dependencies, parents)
else:
self.__dependencies.addJob(job.id, parents, job.weight)
self.__jobs[job.id] = job
job._inJoblist = True
job._parentJoblist = self
[docs] def update(self, subject, event):
self.notifier.forwardNotification(subject, event)
def _writeProgress(self):
nJobsWaiting = self.nJobsOfStatus((jobConsts.WAITING,))
nJobsRunning = self.nJobsOfStatus((jobConsts.RUNNING,))
nJobsSuccessful = self.nJobsOfStatus((jobConsts.DONE, ))
nJobsFailed = self.nJobsOfStatus((jobConsts.ERROR, ))
nJobsSum = nJobsWaiting + nJobsRunning + nJobsSuccessful + nJobsFailed
weightDone = functools.reduce(lambda sum, job:
sum + job.weight, itertools.chain(self.successfulJobs, self.failedJobs), 0)
now = time.time()
weightRemaining = functools.reduce(lambda sum, job: sum + job.getWeightRemaining(self._velocity), itertools.chain(self.notStartedJobs, self.runningJobs), 0)
timeRemaining = weightRemaining / self._nProcsAvailable * (1 / self._velocity)
self._writeStatus(jobConsts.PROGRESS,
"{:.2f}% - Waiting:{:5}; Running:{:3}; Successful:{:5}; Failed:{:3}; Sum:{:5} - {} / {}".format(
(1.0 - weightRemaining / (weightRemaining + weightDone)) * 100.0 if (weightRemaining + weightDone) else 0,
nJobsWaiting, nJobsRunning, nJobsSuccessful, nJobsFailed, nJobsSum,
datetime.timedelta(seconds=int(now - self._startTime)),
datetime.timedelta(seconds=int(now - self._startTime + timeRemaining))),
self._overwritePreviousLine)
[docs] def runJoblist(self, printOverview=False):
"""Executes a job list.
* While not terminated jobs or jobs without dead parent in the list:
* when jobs ready to start exist and enough nodes exist start new jobs
* print the status of all running jobs into the log file and to sys.stdout
* if a job has the status done or error free the nodes and resolve the dependencies
"""
# Write content of buffer to the terminal before running the job list.
sys.stdout.flush()
self._tryCount += 1
# catch TERM signals in SystemExit of try-except
signal.signal(signal.SIGTERM, self.sigterm_handler)
curSleepTime = self.__adaptSleepTime(0, True) # initial sleep time is the minimum sleep time
if not self.__priorityCalculated:
self.calculatePriorities()
Printer.verbosePrint("\nexecution of the job list:\n==========================", Printer.Verbosity.JENKINS)
# self.__dependencies.sanityTest() # does not do anything at the moment
self._startTime = time.time()
self._endTime = None
try:
# main loop
while self.__dependencies.getReadyJobs() and not (self.__abortByError and self.status.code == jobConsts.ERROR):
self.__startNewJobs() # start new jobs?
time.sleep(curSleepTime) # to decrease the processor load
self.__handleStatus() # update statuses
somethingHappened = self.__testforFinishedJobs() # job done?
curSleepTime = self.__adaptSleepTime(curSleepTime, somethingHappened)
if somethingHappened or sys.stdout.isatty(): # 'sys.stdout.isatty()' test if we are in an interactive terminal -> overwrite lines
self._writeProgress()
except (SystemExit, KeyboardInterrupt):
self._writeStatus(jobConsts.ERROR, "job list aborted by KeyboardInterrupt or SystemExit!")
self.stop()
raise
# set own status
if not self.failedJobs and not list(self.notStartedJobs):
self._writeStatus(jobConsts.DONE, "all jobs terminated successfully!")
Printer.verbosePrint("\nAll jobs terminated successfully!", Printer.Verbosity.JENKINS)
elif self.failedJobs:
self._writeStatus(jobConsts.ERROR, "at least one job terminated not successfully!")
Printer.verbosePrint("\nAt least one job terminated not successfully!", Printer.Verbosity.JENKINS)
elif list(self.notStartedJobs):
self._writeStatus(jobConsts.ERROR, "at least one job was not started!")
Printer.verbosePrint("\nAt least one job was not started!", Printer.Verbosity.JENKINS)
# overview over results
if printOverview:
Printer.verbosePrint(str(self), Printer.Verbosity.DEBUG)
Printer.verbosePrint("", Printer.Verbosity.JENKINS)
[docs] def runJob(self):
"""Method to fulfill the interface set by AbstractJob.
"""
self.__runAsJob()
def __adaptSleepTime(self, curSleepTime, somethingHappended, minSleepTime=jobConsts.MIN_SLEEP_TIME,
maxSleepTime=jobConsts.MAX_SLEEP_TIME, adaptionStep=jobConsts.ADAPT_SLEEP_TIME_STEP):
"""Adapts the sleep time between asking for finished jobs.
:param curSleepTime:
:param somethingHappened:
:param minSleepTime:
:param maxSleepTime:
:param adaptionStep:
:type curSleepTime:
:type somethingHappened:
:type minSleepTime:
:type maxSleepTime:
:type adaptionStep:
:return:
:rtype:
"""
if somethingHappended:
return minSleepTime
else:
return min(curSleepTime + adaptionStep, maxSleepTime)
def __recursivelyResetStatus(self):
"""Intended for use when retry is triggered
"""
self.dependencies.reset()
for job in itertools.chain(self.failedJobs, self.successfulJobs, list(self.__jobs.values())):
job.status = Status(jobConsts.WAITING, "")
job._tryCount = 0
self.__jobs[job.id] = job
if isinstance(job, JobList):
job.__recursivelyResetStatus()
self.failedJobs.clear()
self.successfulJobs.clear()
def __runAsJob(self):
"""Executes the job list as job.
"""
# start new jobs?
self.__startNewJobs()
# update statuses
self.__handleStatus()
# job done?
self.__testforFinishedJobs()
# retry?
if self.failedJobs and self._tryCount <= self._retries:
self._writeStatus(jobConsts.RETRY, f"failing job, going for a retry (try {self._tryCount + 1}/{self._retries + 1})")
self.stop()
self.__recursivelyResetStatus()
# end job list
elif not self.__dependencies.getReadyJobs() or (self.__abortByError and self.failedJobs):
if not self.failedJobs:
self._writeStatus(jobConsts.DONE, f"all jobs in job list {self.id} terminated successfully")
else:
self._writeStatus(jobConsts.ERROR, f"at least one job in job list {self.id} failed")
def __startNewJobs(self):
"""Looks for ready jobs in the dependencies, allocates the resources needed and starts the jobs.
"""
# jobs ready w.r.t. dependency graph
jobs = [self.__jobs[jobId] for jobId in self.__dependencies.getReadyJobs()]
# jobs ready w.r.t. to status and resources
readyJobs = [job for job in jobs if job.status.code in [jobConsts.WAITING, jobConsts.RETRY] and job.resourcesAvailable()]
# try to lump small jobs together
if self._jobGrouper is not None:
singleJobs, groupedJobs = self._jobGrouper.groupJobs(readyJobs)
# jobs for grouped jobs are new and need to be added
for job in groupedJobs:
self.addJob(job)
readyJobs = singleJobs + groupedJobs
for job in readyJobs:
# check again because of possibly consumed resources
if job.resourcesAvailable():
job.allocateResources()
job.startJob()
def __handleStatus(self):
"""Prints or passes the status of all current running jobs.
"""
for job in self.__jobs.values():
if job.status.code == jobConsts.RUNNING:
job.updateStatus()
if job.status.code in (jobConsts.ERROR, jobConsts.RETRY, jobConsts.RUNNING):
self.__processJobStatus(job)
def __processJobStatus(self, job):
"""Prints the jobs status or puts it in the communicationQueue.
:param job: the job those status will be processed
:type job: Job instance
"""
printedSomething = False
while True:
try:
status = job.statusQueue.pop(0)
except IndexError:
break
else:
if not self._inJoblist:
if self.__additionalProcessStatus:
printedSomething = Printer.verbosePrint(
_STATUS_STRING.format(jobConsts.REPR_STATUS[status.code], status.message, job.name, job.id),
Printer.Verbosity.JENKINS)
else:
if status.code == jobConsts.RUNNING or status.code == jobConsts.DONE:
pass
else:
prefix = jobConsts.DELETE_PREVIOUS_LINE if self._overwritePreviousLine else ""
printedSomething = Printer.verbosePrint(
prefix + _STATUS_STRING.format(jobConsts.REPR_STATUS[status.code], status.message, job.name, job.id),
Printer.Verbosity.JENKINS)
else:
if status.code == jobConsts.DONE:
# print finalization of _every_ single job in list only at DEBUG verbosity level (= 3)
if self.__verbosity > Printer.Verbosity.DEBUG:
self._writeStatus(jobConsts.RUNNING, f"job in job list done! ({job.name}, id:{job.id})")
else: # status.code in (RUNNING, ERROR, RETRY)
self._writeStatus(jobConsts.RUNNING, status.message + f" ({job.name}, id:{job.id})")
self._overwritePreviousLine = self._overwritePreviousLine and not printedSomething
def _calcVelocity(self):
velocity = self._velocity
weightDone = functools.reduce(lambda sum, job: sum + job.weight, self.successfulJobs, 0)
timePassed = time.time() - self._startTime
if weightDone > 0:
velocity = weightDone / (timePassed * self._nProcsAvailable)
return velocity
def __testforFinishedJobs(self):
"""Tests if any job has finished and frees the resources which belong to finished jobs.
"""
somethingHappened = False
for job in list(self.__jobs.values()):
if job.status.code not in (jobConsts.DONE, jobConsts.ERROR, jobConsts.RETRY):
continue
else:
somethingHappened = True
if job.status.code == jobConsts.RETRY:
job.startJob()
continue
elif job.status.code == jobConsts.ERROR:
self.failedJobs.append(job)
self.__processJobStatus(job) # to get all the job status
if self.__abortByError:
self.__stopByJob(job)
else: # status.code == DONE
self.successfulJobs.append(job)
self.__processJobStatus(job) # to get all the job status
self._velocity = self._calcVelocity()
job.freeResources()
self.__dependencies.jobDone(job.id, job.status.code)
del self.__jobs[job.id]
return somethingHappened
[docs] def updateStatus(self):
"""Makes a iteration over all jobs, execute new jobs and read their statuses.
"""
self.__runAsJob()
[docs] @staticmethod
def viewDependencies(currentJobList):
"""Prints all dependencies of the current JobList by recursively walking through the job list.
:param currentJobList: main job list
:type currentJobList: JobList
"""
currentJobList.dependencies.printGraph()
for job in currentJobList.jobs.values():
if isinstance(job, JobList):
JobList.viewDependencies(job)
[docs] def viewJobs(self):
"""Prints a list of the job list and the associated dependency graph.
"""
if not self.__priorityCalculated:
self.calculatePriorities()
Printer.verbosePrint("jobs:\n=====", printLevel=Printer.Verbosity.ALWAYS)
for job in self.__jobs.values():
Printer.verbosePrint(job, printLevel=Printer.Verbosity.ALWAYS)
Printer.verbosePrint("\ndependencies:\n=============", printLevel=Printer.Verbosity.ALWAYS)
JobList.viewDependencies(self)
def __stopByJob(self, job):
"""Stops all at this time running jobs in the job list by calling their stop-method. Called when a job fails and
abort by error is set.
:param job: job which has caused the stop
:type job: instance of a job
"""
self.stop()
self._writeStatus(jobConsts.ERROR, f"job list {self.id} aborted because of error in job {job.name}(id:{job.id})")
[docs] def stop(self):
"""Stops all currently running jobs. Called by a parent job list or through a keyboard interrupt or system exit.
"""
for job in self.__jobs.values():
if job.status.code == jobConsts.RUNNING:
job.stop()
[docs] def cleanup(self, force=False, **kwargs):
"""Remove created scripts, output and error files
:param force: flag whether cleanup is enforced
:type force: bool
:param removeScript: flag whether files should be removed
:type removeScript: bool
"""
for job in self.successfulJobs + self.failedJobs:
job.cleanup(force, **kwargs)
def setGroup(self, group):
self._group = group
for job in self.allJobs:
job.setGroup(group)
def __str__(self):
string = AbstractJob.__str__(self) + "\n"
if self.jobs:
string += " " * 4 + "not started:\n"
for job in sorted(self.notStartedJobs, key=lambda x: x.id):
string += " " * 8 + str(job) + "\n"
if self.successfulJobs:
string += " " * 4 + "successful finished:\n"
for job in sorted(self.successfulJobs, key=lambda x: x.id):
string += " " * 8 + str(job) + "\n"
if self.failedJobs:
string += " " * 4 + "failed:\n"
for job in sorted(self.failedJobs, key=lambda x: x.id):
string += " " * 8 + str(job) + "\n"
return string.rstrip("\n")
def __repr__(self):
return self.__str__()