Source code for mojo.jobManagement.management.clusterInfo

"""
ClusterInfo
===========
ClusterInfo supplies a class to hold information about a cluster, and the nodes one wants to use.
The class method 'ClusterInfo.from_environment' returns an instance corresponding to the current environment by reading ssh fingerprints.
"""

import dataclasses
import subprocess
import typing
import warnings

from .cpuInfo import CPUInfo


@dataclasses.dataclass(frozen=True)
class Partition:
    """Extending CPUInfo with cluster node specific information."""
    n_nodes: int
    memory_per_node_mb: int  # memory_per_node reported by scontrol show job <job-id>

    cpu_info: CPUInfo


[docs]@dataclasses.dataclass(frozen=True) class ClusterInfo: """Container for information about a known cluster environment. For a new cluster add a new instance to '_supportedClusters' down below.""" name: str ssh_fingerprints: typing.Set[str] # ssh-keyscan localhost 2> /dev/null | ssh-keygen -lf - default_partition_name: str partitions: typing.Dict[str, Partition] queuing_system: str = "SLURM" account_env_var_name: str = "AT_NUM_DEFAULT_ACCOUNT" extra_startup_minutes: int = 10 nodes_per_switch: int = 40 # nodes_per_switch reported by scontrol show topology <switch-id> mail_relay: str = "smtprelay.dlr.de" __environment_checked: typing.ClassVar[bool] = False __environment_singleton: typing.ClassVar[typing.Optional['ClusterInfo']] = None @property def cpuInfo(self) -> CPUInfo: """Get information about the clusters default partition. If you need information about another partition you can use 'instance.partitions[partition_name].cpu_info'.""" return self.partitions[self.default_partition_name].cpu_info
[docs] @classmethod def from_environment(cls, default_partition: typing.Optional[str] = None) -> typing.Optional['ClusterInfo']: """Get a new instance corresponding to the current environment. :param default_partition: Select a different default partition (node type)""" if not cls.__environment_checked: try: ssh_fingerprints_read = subprocess.check_output(["sh", "-c", "ssh-keyscan localhost 2> /dev/null | ssh-keygen -lf - 2> /dev/null"]) except (FileNotFoundError, subprocess.CalledProcessError): warnings.warn("I was not able to read ssh-fingerprints so I could not recognize a known cluster environment!") else: for cluster in _supportedClusters.values(): if set(ssh_fingerprints_read.decode().split()) & cluster.ssh_fingerprints: cls.__environment_singleton = cluster break cls.__environment_checked = True cluster = cls.__environment_singleton if cluster is not None and default_partition is not None: if default_partition not in cluster.partitions.keys(): raise KeyError(f"Unknown node type '{default_partition}' for cluster '{cluster.name}' (known: {cluster.partitions.keys()})!") cluster = dataclasses.replace(cluster, default_partition_name=default_partition) return cluster
[docs] @classmethod def onValidCluster(cls) -> bool: """Method to check if this instance runs on a known cluster environment.""" return cls.from_environment() is not None
"""Dictionary of clusters known to MOJO allowing to automatically dispatch jobs. Adding a new cluster: * Add a new entry in the dictionary mapping the clusters name to a new 'ClusterInfo' instance; * To read a cluster´s ssh fingerprints take a look at the 'subprocess' call in 'ClusterInfo.from_environment' * Fill the partition dictionary as desired * Take a look at 'ClusterInfo'´s default attributes; you may want to set some explicitly """ _supportedClusters = { "CARA": ClusterInfo(name="CARA", ssh_fingerprints={"SHA256:WByNmmdXnC0QwTzsrb/eRXrC+eQ5sysPBcpj3YuJn3c"}, default_partition_name="naples128", partitions={"naples128": Partition(n_nodes=2168, memory_per_node_mb=117200, cpu_info=CPUInfo(numLogicalCPUs=128, numCoresPerSocket=32, numSiblingsPerSocket=64)), "naples256": Partition(n_nodes=112, memory_per_node_mb=247000, cpu_info=CPUInfo(numLogicalCPUs=128, numCoresPerSocket=32, numSiblingsPerSocket=64)), # available until 25.04.2023 "VIS": Partition(n_nodes=20, memory_per_node_mb=1020000, cpu_info=CPUInfo(numLogicalCPUs=128, numCoresPerSocket=32, numSiblingsPerSocket=64)), "rome": Partition(n_nodes=552, memory_per_node_mb=247000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128)), "rome-bigmem": Partition(n_nodes=20, memory_per_node_mb=1020000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128)), "rome-a100": Partition(n_nodes=10, memory_per_node_mb=1020000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128))}), "CARO": ClusterInfo(name="CARO", ssh_fingerprints={"SHA256:20/4sfMgoRDJkb/9CFxn61PRdSYAKRNlNzpo8Dmg7Is"}, default_partition_name="medium", partitions={"medium": Partition(n_nodes=1276, memory_per_node_mb=2477000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128)), "bigmem": Partition(n_nodes=20, memory_per_node_mb=1021800, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128))}) }