Source code for mojo.jobManagement.graph.graph

"""
Graph
=====
The class Graph serves as root for a graph which shows the dependencies between several jobs.
In addition to the methods which are offered by node.py the user gains the ability to add jobs to the graph,
mark jobs as done or failed and get the jobs with no more dependencies sorted by priority. The jobs are
saved as id only. In addition you can view the graph as simple console-output.

Usage
+++++
- create a graph:                 myGraph = Graph()
- add jobs:                       myGraph.addJob(1, [])
                                  myGraph.addJob(2, [1], 2)
                                  ...
- get jobs without dependencies:  myGraph.getReadyJobs()
- mark a job as done or Failed:   myGraph.jobDone(1, DONE)
                                  myGraph.jobDone(2, ERROR)
- print the graph:                myGraph.printGraph()


Required classes and methods::

    from job_management.graph.node import Node, DONE, ERROR, WAITING, reprStatus
    from copy import deepcopy
"""

# see sanityTest
# from copy import deepcopy

from .node import Node
from ...jobManagement import jobManagementData as jobConsts


[docs]class Graph(Node): """Serves as root for a graph, describing the dependencies in a joblist. Has got every node in the graph as child to have a easy access to every node. Inherited from Node:: addParent(self, parent) markParentProcessed(self, parent) addChild(self, newChild) calculatePriority(self) """ def __init__(self, nodeId=-1): """Calls the super-constructor """ Node.__init__(self, nodeId, [], 0)
[docs] def addGraph(self, graph, parentIds): """Adds a new graph, and therefore a new node to the graph. :param graph: the graph to add :param parentsId: a list of the parents ids :type graph: Graph :type parentsId: list of ints """ graph.parents.extend(parentIds[:]) self.addChild(graph) try: self.buildGraph() except RuntimeError: # happens when a circular dependency is build for node in self._childs: if node.id == graph.id: print(f"You have created a circular dependency, the job with the id {node.id} will not be processed") node.parents = [-1] break
[docs] def addJob(self, jobId, parentsIds, weight=1): """Adds a new job, and therefore a new node to the graph. :param jobId: the id of the job to add :param parentsId: a list of the parents ids :param weight: the weight of the job :type jobId: int :type parentsId: list of ints :type weight: int """ self.addChild(Node(jobId, parentsIds[:], weight)) try: self.buildGraph() except RuntimeError: # happens when a circular dependency is build for node in self._childs: if node.id == jobId: print(f"You have created a circular dependency, the job with the id {node.id} will not be processed") node.parents = [-1] break
[docs] def getReadyJobs(self): """Returns a list of the ids of the jobs with no further dependencies, ready to start. They are sorted by weight from big to small. :return: list of the ids of the jobs ready to start :rtype: list of int """ readyJobs = [x for x in self._childs if x.parents == [] and x.status == jobConsts.WAITING] # sort from big to small weight readyJobs.sort(key=lambda x: x.calculatedPriority, reverse=True) return [x.id for x in readyJobs]
[docs] def jobDone(self, jobId, status): """Marks a job in the graph as DONE or ERROR. If the status is ERROR all childs an grandchilds will get the status DEAD_PARENT. This happens in the setStatus of the nodes. :param jobId: id of the job with the status to change :param status: new status :type jobId: int :type status: int-constant """ assert status == jobConsts.DONE or status == jobConsts.ERROR for x in self._childs: if x.id == jobId: x.status = status break
[docs] def buildGraph(self): """Adds the associated childs to every node. :return: the current Graph-instance :rtype: Graph() """ for currentNode in self._childs: for currentParent in currentNode.parents[:]: if isinstance(currentParent, int): parent = [x for x in self._childs if x.id == currentParent] if parent: parent = parent[0] currentNode.addParent(parent) parent.addChild(currentNode) currentNode.parents.remove(currentParent) return self
[docs] def reset(self): """Allow to process this graph again """ for node in self._childs: node.status = jobConsts.WAITING node.parents += node._processedParents node._processedParents = []
[docs] def printGraph(self): """Prints a list with every node, its weight and its parents and children to the console """ for currentNode in self._childs: print("id: {0:>3}; status: {1:>12}; parents: {2:>30}; children: {3:>30}; priority: {4:>3}".format(currentNode.id, jobConsts.REPR_STATUS[currentNode.status], str([(x if isinstance(x, int) else x.id) for x in currentNode.parents]), str([x.id for x in currentNode.childs]), currentNode.calculatedPriority))
[docs] def sanityTest(self): """Tests if all jobs in the graph can be reached by performing a test run on the graph. """
# TODO: Felix, write new sanity test # graph = deepcopy(self) # while graph.getReadyJobs(): # for job in graph.getReadyJobs(): # graph.jobDone(job, DONE) # badJobs = [job.id for job in graph._childs if job.status != DONE] # if badJobs: # print(f"The jobs with the ids {badJobs} will not be executed, because of bad dependencies!")