Source code for mojo.jobManagement.management.resources

"""
Resources
==========
Resources manages free nodes/processors on the cluster or a workstation.
If you initialize a instance of Resources you have to pass the number of
nodes you want to use. You can either ask for the number of free nodes, allocate
nodes or free nodes if your calculation is done.

The __call__, __enter__ and __exit__-methods allow to use a instance of Resources
with the with-statement.
"""


import sys

from ...bricabrac.fileIO import Printer
from ...pavayo import computeData as constants
from ..management import cpuInfo as cpuInfo
from ..management import clusterInfo as clusterInfo

# integer constants for default resources
PROCESSORS = 0
NODES = 1
TECPLOT_LICENSES = 2

# default resources on every machine
DEFAULT_RESOURCES = {
    PROCESSORS: cpuInfo.CPUInfo.from_proc_cpuinfo().numLogicalCPUs,
    # there are no nodes on local machines
    NODES: sys.maxsize if clusterInfo.ClusterInfo.onValidCluster() else 0,
    # limit number of Tecplot processes on cluster
    TECPLOT_LICENSES: constants.DEFAULT_NUMBER_TECPLOT_CLUSTER_FRONTEND if clusterInfo.ClusterInfo.onValidCluster() else sys.maxsize
}


[docs]class Resources: """Class to manage the resources available for e.g. a joblist :param nRes: number of available nodes :type nRes: int """ __sharedState = dict()
[docs] @classmethod def fromParameters(cls, nProcs=None, nNodes=None, nTecplotLicenses=None): """Generates a Resource instance from parameters. :param cls: class :param nProcs: number of processors available :param nNodes: number of nodes available :param nTecplotLicenses: number of tecplot licenses available :type cls: class :type nProcs: Optional[int] :type nNodes: Optional[int] :type nTecplotLicenses: Optional[int] :return: a new instance of Resources :rtype: Resources instance """ resourcesDict = dict() if nProcs is not None: resourcesDict[PROCESSORS] = nProcs if nNodes is not None: resourcesDict[NODES] = nNodes if nTecplotLicenses is not None: resourcesDict[TECPLOT_LICENSES] = nTecplotLicenses return cls(resourcesDict)
def __init__(self, resourcesDict=None): """Initialises a new instance of Resources, saves the number of the available nodes as instance variable. All nodes are free at the beginning. Uses the lock to gain thread safety. :param resourcesDict: a dictionary containing names of resources mapped to their availability; -1 means infinite :type resourcesDict: dict """ # borg pattern, the python implementation of the singleton pattern # for further details view: http://code.activestate.com/recipes/66531-singleton-we-dont-need-no-stinkin-singleton-the-bo/ self.__dict__ = self.__sharedState # check for valid input if resourcesDict: for number in resourcesDict.values(): if not (number > 0 or number == -1): raise RuntimeError(f"Invalid number of resources specified ({number})!") self.update(resourcesDict if resourcesDict is not None else dict()) # fill resources dictionary self.__requestedRes = None # a attribute needed for the use in a with statement self._maxResources = self._resourcesDict.copy() # allow access to max resources when some of the resources are allocated @property def maxProcs(self): return self._maxResources.get(PROCESSORS, 0) @property def maxNodes(self): return self._maxResources.get(NODES, 0) @property def maxTecplotLicenses(self): return self._maxResources.get(TECPLOT_LICENSES, 0)
[docs] def update(self, resourcesDict): """Fills the resource dictionary using the resource fabric class. Replaces -1 with sys.maxint for easier handling in the other methods of this class. :param resourcesDict: a dictionary containing names and number of resources :type resourcesDict: dictionary """ # warn, when overwriting existing resources if hasattr(self, "_resourcesDict"): for resourceName, resourceNumber in resourcesDict.items(): if resourceName in self._resourcesDict: # pylint: disable=access-member-before-definition Printer.warning(f"'{resourceName}' will be overwritten in the resource management!") if resourceNumber != -1: self._resourcesDict[resourceName] = resourceNumber # pylint: disable=access-member-before-definition else: self._resourcesDict[resourceName] = sys.maxsize # pylint: disable=access-member-before-definition # generate a new resources dictionary, using default values else: resources = DEFAULT_RESOURCES.copy() # to not alter a dictionary while iterating over it using a copy here for resourceName, resourceNumber in resourcesDict.copy().items(): resourcesDict[resourceName] = resourceNumber if resourceNumber != -1 else sys.maxsize resources.update(resourcesDict) self._resourcesDict = resources
def __call__(self, resourcesDict): """When a instance of Resources is called, nRes is saved as instance variable in self.__requestedRes :param resourcesDict: dictionary containing the resources to allocate later :type resourcesDict: dictionary """ self.__requestedRes = resourcesDict return self def __enter__(self): """When used as with-statement __enter__ allocates the resources, saved earlier in self.__requested nodes """ self.allocateResources(self.__requestedRes) def __exit__(self, excType, value, traceback): """When used as with-statement __exit__ releases the nodes allocated earlier in __enter__ :param excType: A exception thrown in the with-statement or None :param value: Type of the eventually thrown exception :param traceback: Traceback-object :excType excType: Exception or None :excType value: String :excType traceback: Traceback-object """ self.releaseResources(self.__requestedRes) @property def resourcesDict(self): """Returns a copy of the resources dictionary hold by this manager. :return: copy of the resources dictionary hold by this manager :rtype: dictionary """ return self._resourcesDict.copy()
[docs] def resourcesAvailable(self, resourcesDict): """Returns true if in the resource management has the resources in resourcesDict available. :param resourcesDict: a dictionary mapping resources to the needed number :type resourcesDict: dictionary :return: True if the resources are available in this environment :rtype: boolean """ try: for name, number in resourcesDict.items(): if not number <= self._resourcesDict[name]: break else: return True except KeyError: raise RuntimeError(f"'{name}' is not available in the resource management!") return False
[docs] def allocateResources(self, resourcesDict): """Allocates the resources asked for. :param resourcesDict: dictionary with the resources to allocate :type resourcesDict: dictionary """ if self.resourcesAvailable(resourcesDict): for name, number in resourcesDict.items(): self._resourcesDict[name] -= number else: raise RuntimeError("You tried to allocate more resources then available!")
[docs] def releaseResources(self, resourcesDict): """Allows to free resources allocated earlier :param resourcesDict: dictionary containing the resources to free :type resourcesDict: dictionary """ for name, number in resourcesDict.items(): self._resourcesDict[name] += number
def __str__(self): stringTxt = ["Current resources:", ] for name, number in self.resourcesDict.items(): stringTxt.append(f"{name:25s} : {number}") return "\n".join(stringTxt)