"""
ClusterInfo
===========
ClusterInfo supplies a class to hold information about a cluster, and the nodes one wants to use.
The class method 'ClusterInfo.from_environment' returns an instance corresponding to the current environment by reading ssh fingerprints.
"""
import dataclasses
import subprocess
import typing
import warnings
from .cpuInfo import CPUInfo
@dataclasses.dataclass(frozen=True)
class Partition:
"""Extending CPUInfo with cluster node specific information."""
n_nodes: int
memory_per_node_mb: int # memory_per_node reported by scontrol show job <job-id>
cpu_info: CPUInfo
[docs]@dataclasses.dataclass(frozen=True)
class ClusterInfo:
"""Container for information about a known cluster environment.
For a new cluster add a new instance to '_supportedClusters' down below."""
name: str
ssh_fingerprints: typing.Set[str] # ssh-keyscan localhost 2> /dev/null | ssh-keygen -lf -
default_partition_name: str
partitions: typing.Dict[str, Partition]
queuing_system: str = "SLURM"
account_env_var_name: str = "AT_NUM_DEFAULT_ACCOUNT"
extra_startup_minutes: int = 10
nodes_per_switch: int = 40 # nodes_per_switch reported by scontrol show topology <switch-id>
mail_relay: str = "smtprelay.dlr.de"
__environment_checked: typing.ClassVar[bool] = False
__environment_singleton: typing.ClassVar[typing.Optional['ClusterInfo']] = None
@property
def cpuInfo(self) -> CPUInfo:
"""Get information about the clusters default partition.
If you need information about another partition you can use 'instance.partitions[partition_name].cpu_info'."""
return self.partitions[self.default_partition_name].cpu_info
[docs] @classmethod
def from_environment(cls, default_partition: typing.Optional[str] = None) -> typing.Optional['ClusterInfo']:
"""Get a new instance corresponding to the current environment.
:param default_partition: Select a different default partition (node type)"""
if not cls.__environment_checked:
try:
ssh_fingerprints_read = subprocess.check_output(["sh", "-c", "ssh-keyscan localhost 2> /dev/null | ssh-keygen -lf - 2> /dev/null"])
except (FileNotFoundError, subprocess.CalledProcessError):
warnings.warn("I was not able to read ssh-fingerprints so I could not recognize a known cluster environment!")
else:
for cluster in _supportedClusters.values():
if set(ssh_fingerprints_read.decode().split()) & cluster.ssh_fingerprints:
cls.__environment_singleton = cluster
break
cls.__environment_checked = True
cluster = cls.__environment_singleton
if cluster is not None and default_partition is not None:
if default_partition not in cluster.partitions.keys():
raise KeyError(f"Unknown node type '{default_partition}' for cluster '{cluster.name}' (known: {cluster.partitions.keys()})!")
cluster = dataclasses.replace(cluster, default_partition_name=default_partition)
return cluster
[docs] @classmethod
def onValidCluster(cls) -> bool:
"""Method to check if this instance runs on a known cluster environment."""
return cls.from_environment() is not None
"""Dictionary of clusters known to MOJO allowing to automatically dispatch jobs.
Adding a new cluster:
* Add a new entry in the dictionary mapping the clusters name to a new 'ClusterInfo' instance;
* To read a cluster´s ssh fingerprints take a look at the 'subprocess' call in 'ClusterInfo.from_environment'
* Fill the partition dictionary as desired
* Take a look at 'ClusterInfo'´s default attributes; you may want to set some explicitly
"""
_supportedClusters = {
"CARA": ClusterInfo(name="CARA",
ssh_fingerprints={"SHA256:WByNmmdXnC0QwTzsrb/eRXrC+eQ5sysPBcpj3YuJn3c"},
default_partition_name="naples128",
partitions={"naples128": Partition(n_nodes=2168, memory_per_node_mb=117200, cpu_info=CPUInfo(numLogicalCPUs=128, numCoresPerSocket=32, numSiblingsPerSocket=64)),
"naples256": Partition(n_nodes=112, memory_per_node_mb=247000, cpu_info=CPUInfo(numLogicalCPUs=128, numCoresPerSocket=32, numSiblingsPerSocket=64)), # available until 25.04.2023
"VIS": Partition(n_nodes=20, memory_per_node_mb=1020000, cpu_info=CPUInfo(numLogicalCPUs=128, numCoresPerSocket=32, numSiblingsPerSocket=64)),
"rome": Partition(n_nodes=552, memory_per_node_mb=247000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128)),
"rome-bigmem": Partition(n_nodes=20, memory_per_node_mb=1020000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128)),
"rome-a100": Partition(n_nodes=10, memory_per_node_mb=1020000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128))}),
"CARO": ClusterInfo(name="CARO",
ssh_fingerprints={"SHA256:20/4sfMgoRDJkb/9CFxn61PRdSYAKRNlNzpo8Dmg7Is"},
default_partition_name="medium",
partitions={"medium": Partition(n_nodes=1276, memory_per_node_mb=2477000, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128)),
"bigmem": Partition(n_nodes=20, memory_per_node_mb=1021800, cpu_info=CPUInfo(numLogicalCPUs=256, numCoresPerSocket=64, numSiblingsPerSocket=128))})
}