Source code for mojo.jobManagement.management.resources
"""
Resources
==========
Resources manages free nodes/processors on the cluster or a workstation.
If you initialize a instance of Resources you have to pass the number of
nodes you want to use. You can either ask for the number of free nodes, allocate
nodes or free nodes if your calculation is done.
The __call__, __enter__ and __exit__-methods allow to use a instance of Resources
with the with-statement.
"""
import sys
from ...bricabrac.fileIO import Printer
from ...pavayo import computeData as constants
from ..management import cpuInfo as cpuInfo
from ..management import clusterInfo as clusterInfo
# integer constants for default resources
PROCESSORS = 0
NODES = 1
TECPLOT_LICENSES = 2
# default resources on every machine
DEFAULT_RESOURCES = {
PROCESSORS: cpuInfo.CPUInfo.from_proc_cpuinfo().numLogicalCPUs,
# there are no nodes on local machines
NODES: sys.maxsize if clusterInfo.ClusterInfo.onValidCluster() else 0,
# limit number of Tecplot processes on cluster
TECPLOT_LICENSES: constants.DEFAULT_NUMBER_TECPLOT_CLUSTER_FRONTEND if clusterInfo.ClusterInfo.onValidCluster() else sys.maxsize
}
[docs]class Resources:
"""Class to manage the resources available for e.g. a joblist
:param nRes: number of available nodes
:type nRes: int
"""
__sharedState = dict()
[docs] @classmethod
def fromParameters(cls, nProcs=None, nNodes=None, nTecplotLicenses=None):
"""Generates a Resource instance from parameters.
:param cls: class
:param nProcs: number of processors available
:param nNodes: number of nodes available
:param nTecplotLicenses: number of tecplot licenses available
:type cls: class
:type nProcs: Optional[int]
:type nNodes: Optional[int]
:type nTecplotLicenses: Optional[int]
:return: a new instance of Resources
:rtype: Resources instance
"""
resourcesDict = dict()
if nProcs is not None:
resourcesDict[PROCESSORS] = nProcs
if nNodes is not None:
resourcesDict[NODES] = nNodes
if nTecplotLicenses is not None:
resourcesDict[TECPLOT_LICENSES] = nTecplotLicenses
return cls(resourcesDict)
def __init__(self, resourcesDict=None):
"""Initialises a new instance of Resources, saves the number of the available
nodes as instance variable. All nodes are free at the beginning. Uses the lock
to gain thread safety.
:param resourcesDict: a dictionary containing names of resources mapped to their availability; -1 means infinite
:type resourcesDict: dict
"""
# borg pattern, the python implementation of the singleton pattern
# for further details view: http://code.activestate.com/recipes/66531-singleton-we-dont-need-no-stinkin-singleton-the-bo/
self.__dict__ = self.__sharedState
# check for valid input
if resourcesDict:
for number in resourcesDict.values():
if not (number > 0 or number == -1):
raise RuntimeError(f"Invalid number of resources specified ({number})!")
self.update(resourcesDict if resourcesDict is not None else dict()) # fill resources dictionary
self.__requestedRes = None # a attribute needed for the use in a with statement
self._maxResources = self._resourcesDict.copy() # allow access to max resources when some of the resources are allocated
@property
def maxProcs(self):
return self._maxResources.get(PROCESSORS, 0)
@property
def maxNodes(self):
return self._maxResources.get(NODES, 0)
@property
def maxTecplotLicenses(self):
return self._maxResources.get(TECPLOT_LICENSES, 0)
[docs] def update(self, resourcesDict):
"""Fills the resource dictionary using the resource fabric class. Replaces -1 with sys.maxint for easier handling in the other methods of this class.
:param resourcesDict: a dictionary containing names and number of resources
:type resourcesDict: dictionary
"""
# warn, when overwriting existing resources
if hasattr(self, "_resourcesDict"):
for resourceName, resourceNumber in resourcesDict.items():
if resourceName in self._resourcesDict: # pylint: disable=access-member-before-definition
Printer.warning(f"'{resourceName}' will be overwritten in the resource management!")
if resourceNumber != -1:
self._resourcesDict[resourceName] = resourceNumber # pylint: disable=access-member-before-definition
else:
self._resourcesDict[resourceName] = sys.maxsize # pylint: disable=access-member-before-definition
# generate a new resources dictionary, using default values
else:
resources = DEFAULT_RESOURCES.copy()
# to not alter a dictionary while iterating over it using a copy here
for resourceName, resourceNumber in resourcesDict.copy().items():
resourcesDict[resourceName] = resourceNumber if resourceNumber != -1 else sys.maxsize
resources.update(resourcesDict)
self._resourcesDict = resources
def __call__(self, resourcesDict):
"""When a instance of Resources is called, nRes is saved as instance variable
in self.__requestedRes
:param resourcesDict: dictionary containing the resources to allocate later
:type resourcesDict: dictionary
"""
self.__requestedRes = resourcesDict
return self
def __enter__(self):
"""When used as with-statement __enter__ allocates the resources, saved earlier in
self.__requested nodes
"""
self.allocateResources(self.__requestedRes)
def __exit__(self, excType, value, traceback):
"""When used as with-statement __exit__ releases the nodes allocated earlier
in __enter__
:param excType: A exception thrown in the with-statement or None
:param value: Type of the eventually thrown exception
:param traceback: Traceback-object
:excType excType: Exception or None
:excType value: String
:excType traceback: Traceback-object
"""
self.releaseResources(self.__requestedRes)
@property
def resourcesDict(self):
"""Returns a copy of the resources dictionary hold by this manager.
:return: copy of the resources dictionary hold by this manager
:rtype: dictionary
"""
return self._resourcesDict.copy()
[docs] def resourcesAvailable(self, resourcesDict):
"""Returns true if in the resource management has the resources in resourcesDict available.
:param resourcesDict: a dictionary mapping resources to the needed number
:type resourcesDict: dictionary
:return: True if the resources are available in this environment
:rtype: boolean
"""
try:
for name, number in resourcesDict.items():
if not number <= self._resourcesDict[name]:
break
else:
return True
except KeyError:
raise RuntimeError(f"'{name}' is not available in the resource management!")
return False
[docs] def allocateResources(self, resourcesDict):
"""Allocates the resources asked for.
:param resourcesDict: dictionary with the resources to allocate
:type resourcesDict: dictionary
"""
if self.resourcesAvailable(resourcesDict):
for name, number in resourcesDict.items():
self._resourcesDict[name] -= number
else:
raise RuntimeError("You tried to allocate more resources then available!")
[docs] def releaseResources(self, resourcesDict):
"""Allows to free resources allocated earlier
:param resourcesDict: dictionary containing the resources to free
:type resourcesDict: dictionary
"""
for name, number in resourcesDict.items():
self._resourcesDict[name] += number
def __str__(self):
stringTxt = ["Current resources:", ]
for name, number in self.resourcesDict.items():
stringTxt.append(f"{name:25s} : {number}")
return "\n".join(stringTxt)