"""
testSuiteComponents.py
======================
testSuiteComponents contains all the subclasses which are to small to get an own file. The only changes to the parent classes are implemented in OpListSub and
JobListSub. These classes also inherit from collections.Sequence . JobSub got an method to return the job represented as job from the jobManagement package.
"""
import collections
import os
import re
import io
from typing import Optional
from ...bricabrac import fileIO
from ..computeMethods import getMOJOPythonPath, convertWallTimeToWeight
from ...jobManagement.jobs.job import Job as JobManagementJob
from ...jobManagement.jobs.jobList import JobList as JobManagementJobList
from ...jobManagement import jobManagementData as jobConsts
from ...jobManagement.management.clusterInfo import ClusterInfo
from ...jobManagement.management.cpuInfo import CPUInfo
from ...pavayo import computeData as constants
from . import testcase_generated as supermod
def _replaceExecutable(executableDict, executableIn, resourcesDict=None):
"""Replace a executable from a XML file with a executable passed to PAVAYO and return the related job type.
:param executableDict: dictionary of executables
:param executableIn: name of the executable from the XML file
:type executableDict: ExecutableResources
:type executableIn: str
:return: path of the executable or python method to use by the job class
:return: job type
:rtype: str or function
:rtype: str or None
"""
def createUsedresourcesDict(execName): return {execName: 1} if resourcesDict is not None and execName in resourcesDict else None
if executableIn.startswith("$"):
executableNameUpper = executableIn.upper()[1:]
execResource = executableDict[executableIn.upper()]
executable = execResource.path
if not executable:
raise RuntimeError(f"Could not find a replacement for the template '{executableIn}'")
if execResource.environmentName == constants.GMCPLAY_TEMPLATE:
jobType = constants.GMC_PLAY_EXEC
else:
for traceSuiteProg in (constants.TRACE_SUITE_TRACE_EXEC, constants.TRACE_SUITE_PREP_EXEC, constants.TRACE_SUITE_POST_EXEC):
if executableNameUpper.startswith(traceSuiteProg):
jobType = traceSuiteProg
break
else:
jobType = None
__setWeight = {constants.GMCPLAY_TEMPLATE: jobConsts.DEFAULT_WEIGHT_METHOD, constants.RM_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
constants.MV_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, constants.CP_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
constants.LN_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, constants.PYTHON_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SHELL,
constants.TECPLOT_TEMPLATE: jobConsts.DEFAULT_WEIGHT_METHOD, constants.TECPLOT_MACRO_TEMPLATE: jobConsts.DEFAULT_WEIGHT_METHOD}
weight = __setWeight.get(executableIn)
usedResources = createUsedresourcesDict(executableNameUpper)
else:
executable = executableIn
jobType = None
weight = None
usedResources = createUsedresourcesDict(executableIn)
weight = weight or 0
return executable, jobType, weight, usedResources
[docs]class StrSub:
"""Contains the string method only.
"""
def __str__(self):
string = io.StringIO()
self.export(string, 0) # pylint: disable=no-member
return string.getvalue()[:-1]
[docs]class DependencySub(supermod.Dependency, StrSub):
def getPath(self):
return os.path.join(self.level if self.level is not None else "", self.subLevel if self.subLevel is not None else "",
self.job if self.job is not None else "")
def distribute_processes_among_threads(n_processes: int, processes_per_node: Optional[int], threads_per_process: Optional[int]) -> (int, int, int):
if processes_per_node is not None and threads_per_process is not None:
threads_per_process = threads_per_process * processes_per_node
elif processes_per_node is not None: # threads per process is None
threads_per_process = processes_per_node
else: # ignoring a potential set thread_per_process
cpu_info = ClusterInfo.from_environment().cpuInfo if ClusterInfo.onValidCluster() else CPUInfo.from_proc_cpuinfo()
threads_per_process = cpu_info.numSiblingsPerSocket
threads_per_process = min(threads_per_process, n_processes)
processes_per_node = 1
n_processes = 1
return n_processes, processes_per_node, threads_per_process
class SetupJobSub:
def _getJob(self, options, executableDict, workingDirectory, outputDir, outputFile=None, resourcesDict=None):
"""Returns the instance of a job from the job management. The job represents the job hold by the instance.
:param options: options instance with options parsed from the command line
:param executableDict: dictionary of executables
:param workingDirectory: base path of the test case
:param outputDir: output folder
:param outputFile: path of output file
:type options: OptionParser instance
:type executableDict: ExecutableResources
:type workingDirectory: str
:type outputDir: str
:type outputFile: Optional[str]
:return: a job instance which represents the instance itself
:rtype: JobManagementJob or JobManagementJobList
"""
executable, jobType, weight, usedResources = _replaceExecutable(executableDict, self.executable, resourcesDict=resourcesDict)
try:
if executableDict[self.executable].commandLineArguments:
self.args = self.args + executableDict[self.executable].commandLineArguments
except KeyError:
pass
# add additional Tecplot arguments
if executable == constants.TECPLOT_EXECUTABLE:
args = constants.TECPLOT_ARGUMENTS + self.args
self.executeOnMaster = True
nTecLics = 1
else:
args = self.args
nTecLics = 0
try:
_, execName = os.path.split(executable)
except TypeError: # if the executable is a method
execName = ""
# update job type if not specified yet
self.jobType = self.jobType or jobType
if self.jobType == constants.GMC_PLAY_EXEC:
self.nProcs = 1
elif self.jobType == constants.TRACE_SUITE_TRACE_EXEC:
self.threadsPerProc = options.threadsPerProc or self.threadsPerProc
self.procsPerNode = options.procsPerNode or self.procsPerNode
if options.useThreadsInsteadOfProcesses:
self.nProcs, self.procsPerNode, self.threadsPerProc = distribute_processes_among_threads(self.nProcs, self.procsPerNode,
self.threadsPerProc)
if execName in options.valgrindExec:
jobName = self.name + "_" + "valgrind"
readVar = "yes" # yes|no
trackOrigins = "yes" # no|yes
leakCheck = "full" # no|summary|full
genSupp = "no" # no|yes|all
showReachable = "no" if execName in constants.EXECUTABLES_TO_RUN_WITH_VALGIRIND else "yes"
xmlFileNames = f"--xml=yes --xml-file=valgrind.{self.name}.%p.xml"
valgrindCommandLine = constants.VALGRIND_COMMAND_LINE.format(readVarInfo=readVar, trackOrigins=trackOrigins,
leakCheck=leakCheck, showReachable=showReachable,
suppressions=options.valgrindSuppFile,
genSuppressions=genSupp,
logFile=os.path.join(outputDir, constants.VALGRIND_LOG_PREFIX + jobName),
xmlFileNames=xmlFileNames)
args = valgrindCommandLine.split() + [executable] + args
executable = constants.VALGRIND_EXECUTABLE
if self.successStatement:
successStatement = self.successStatement
else:
try:
successStatement = executableDict[self.executable].successStatement
except KeyError:
successStatement = list()
if self.failStatement:
failStatement = self.failStatement
else:
try:
failStatement = executableDict[self.executable].failStatement
except KeyError:
failStatement = list()
successStatementsRe = [statement.valueOf_ if statement.isRegExp else re.escape(statement.valueOf_) for statement in successStatement]
failStatementsRe = [statement.valueOf_ if statement.isRegExp else re.escape(statement.valueOf_) for statement in failStatement]
envVars = {"PYTHONPATH": getMOJOPythonPath() + ":" + os.environ.get("PYTHONPATH", ""), "MPLBACKEND": "agg"} if executable == constants.PYTHON_EXECUTABLE else None
# set weight from wallTime or DEFAULT
weight = convertWallTimeToWeight(self, jobConsts.DEFAULT_WEIGHT_SHELL if weight == 0 else weight)
weight = weight * (self.nProcs or 1) * (self.threadsPerProc or 1)
nProcs = self.nProcs
if options.allowFewerProcs:
nProcs = min(self.nProcs, options.nprocs)
args = [_replaceExecutable(executableDict, el)[0] for elList in (arg.split() for arg in args) for el in elList]
if options.useThreadsInsteadOfProcesses: # remove load balance file parameter as it does not mix with the changed processor count
try:
lbIndex = args.index("-lb")
except ValueError:
pass
else:
del args[lbIndex:lbIndex + 2]
aJob = JobManagementJob(executable=executable, args=args, nProcs=nProcs, nTecplotLicenses=nTecLics,
jobName=self.name, queue=options.queue, workingDirectory=workingDirectory,
outputFile=outputFile, outputDir=outputDir,
procsPerNode=self.procsPerNode, threadsPerProc=self.threadsPerProc,
nodeType=self.nodeType or options.nodeType, executeOnMaster=self.executeOnMaster,
jobType=self.jobType, successStatements=successStatementsRe,
failStatements=failStatementsRe, ignoreExitCode=self.ignoreExitCode, envVars=envVars, wallTime=self.wallTime,
weight=weight, resourcesDict=usedResources, useMpirun=self.useMpirun, account=options.account, slurmSwitches=options.slurmSwitches)
return aJob
[docs]class SuiteSub(supermod.Suite, StrSub):
pass
[docs]class TclistSub(supermod.Tclist, StrSub):
pass
[docs]class JobListSub(supermod.JobList, collections.Sequence, StrSub):
"""
"""
COMPUTATION = "COMPUTATION"
POSTPROCESSING = "POSTPROCESSING"
RESTART = "RESTART"
MESHING = "MESHING"
def __len__(self):
"""Returns the number of operation points in the operation point list.
:return: number of operation points
:rtype: int
"""
return len(self.job)
def __getitem__(self, index):
"""Returns a operation point at a given index.
:return: operation point at given index
:rtype: operation point instance
"""
return self.job[index]
[docs] def append(self, job):
"""Appends a new job to the job list represented by the instance.
:param job: the job instance to append
:type job: TestcaseJob instance
"""
self.job.append(job)
[docs] def insert(self, index, value):
"""Inserts a job object at the given index.
:param index: index in the job list where the job will be inserted
:param value: the new job object
:type index: integer
:type value: TestcaseJob instance
"""
self.job.insert(index, value)
[docs]class SimulationJobSub(supermod.SimulationJob, SetupJobSub, StrSub):
"""
"""
[docs] def getJob(self, options, executableDict, workingDirectory, outputDir, resourcesDict=None):
"""Returns the instance of a job from the job management. The job represents the job hold by the instance.
:param options: options instance with options parsed from the command line
:param executableDict: dictionary of executables
:param workingDirectory: base path of the test case
:param outputDir: output folder
:type options: OptionParser instance
:type executableDict: ExecutableResources
:type workingDirectory: str
:type outputDir: str
:return: a job instance which represents the instance itself
:rtype: JobManagementJob or JobManagementJobList
"""
aJob = self._getJob(options, executableDict, workingDirectory, outputDir, None, resourcesDict=resourcesDict)
if self.saveLog:
aJobList = JobManagementJobList(name=self.name + "JobList")
aJobList.addJob(aJob)
collectJob = JobManagementJob(SimulationJobSub._copyOutputFile, args=[self, os.path.join(outputDir, aJob.outFile)],
jobName=self.name + "Copy", workingDirectory=workingDirectory, executeOnMaster=True,
outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD)
aJobList.addJob(collectJob, [aJob.id])
return aJobList
else:
return aJob
@staticmethod
def _copyOutputFile(job, sourceFile):
"""Copies the source file to the path stored in attribute 'saveLog'.
:param job: SimulationJobSub instance
:param sourceFile: path to source file
:type job: SimulationJobSub
:type sourceFile: str
"""
path, _ = os.path.split(job.saveLog)
fileIO.md(path)
fileIO.globCopy(sourceFile, job.saveLog, match=True, verbose=1)
[docs]class AnalysisJobSub(supermod.AnalysisJob, StrSub):
"""
"""
[docs] def getJob(self, _options, executableDict, workingDirectory, outputDir, resourcesDict=None):
"""Returns the instance of a job from the job management. The job represents the job hold by the instance.
:param _options: options instance with options parsed from the command line
:param executableDict: dictionary of executables
:param workingDirectory: base path of the test case
:param outputDir: output folder
:type _options: OptionParser instance
:type executableDict: ExecutableResources
:type workingDirectory: str
:type outputDir: str
:return: a job instance which represents the instance itself
:rtype: JobManagementJob or JobManagementJobList
"""
executable, jobType, weight, usedResources = _replaceExecutable(executableDict, self.executable, resourcesDict=resourcesDict)
# add additional Tecplot arguments
if executable == constants.TECPLOT_EXECUTABLE:
args = constants.TECPLOT_ARGUMENTS + self.args
else:
args = self.args
args = [_replaceExecutable(executableDict, el)[0] for elList in (arg.split() for arg in args) for el in elList]
# check whether jobs needs Tecplot
nTecLics = 1 if self.useTecplot or executable == constants.TECPLOT_EXECUTABLE else 0
# add MOJO path to environment for Python scripts
envVars = {"PYTHONPATH": getMOJOPythonPath() + ":" + os.environ.get("PYTHONPATH", ""), "MPLBACKEND": "agg"} if executable == constants.PYTHON_EXECUTABLE else None
# set weight from wallTime or DEFAULT
weight = convertWallTimeToWeight(self, jobConsts.DEFAULT_WEIGHT_SHELL if weight == 0 else weight)
return JobManagementJob(executable=executable, args=args, nProcs=1, jobName=self.name,
nTecplotLicenses=nTecLics, workingDirectory=workingDirectory, outputDir=outputDir,
executeOnMaster=True,
jobType=jobType, ignoreExitCode=self.ignoreExitCode, envVars=envVars,
weight=weight, resourcesDict=usedResources)
[docs]class JobSub(supermod.Job, SetupJobSub, StrSub):
"""
"""
[docs] def getJob(self, options, executableDict, basePath, outputDir, resourcesDict=None):
"""Returns the instance of a job from the job management. The job represents the job hold by the instance.
:param options: options parsed from the command line
:param executableDict: dictionary of executables
:param basePath: base path of the test case
:param outputDir: path of output directory
:type options: OptionParser instance
:type executableDict: ExecutableResources
:type basePath: str
:type outputDir: str
:return: job instance which represents the instance itself
:rtype: Job
"""
workingDirectory = os.path.join(basePath, self.workingDirectory) if self.workingDirectory else basePath
outputFile = self.outputFile
aJob = self._getJob(options, executableDict, workingDirectory, outputDir, outputFile, resourcesDict=resourcesDict)
return aJob
[docs]class SetupSub(supermod.Setup, collections.Sequence, StrSub):
[docs] def find(self, name):
"""
"""
for opList in self.opList:
if opList.name == name:
return opList
return None
def __len__(self):
"""Returns the number of operation point lists.
:return: number of operation point lists
:rtype: int
"""
return len(self.opList)
def __getitem__(self, index):
"""Returns a operation point list at a given index.
:param index: index
:type index: int
:return: operation point list at given index
:rtype: ValidationOpListSub
"""
return self.opList[index]
[docs]class RestartSetupSub(supermod.RestartSetup, collections.Sequence, StrSub):
[docs] def find(self, name):
"""
"""
for opList in self.opList:
if opList.name == name:
return opList
return None
def __len__(self):
"""Returns the number of operation point list.
:return: number of operation point lists
:rtype: int
"""
return len(self.opList)
def __getitem__(self, index):
"""Returns a operation point list at a given index.
:param index: index
:type index: int
:return: operation point list at given index
:rtype: RestartValidationOpListSub
"""
return self.opList[index]
[docs]class OpListSub(supermod.OpList, collections.Sequence, StrSub):
def __len__(self):
"""Returns the number of operation points in the operation point list.
:return: number of operation points
:rtype: int
"""
return len(self.op)
def __getitem__(self, index):
"""Returns a operation point at a given index.
:param index: index
:type index: int
:return: operation point at given index
:rtype: ValidationOpSub
"""
return self.op[index]
def append(self, operationPoint):
self.op.append(operationPoint)
def insert(self, index, value):
self.op.insert(index, value)
[docs]class ValidationOpListSub(supermod.ValidationOpList, collections.Sequence, StrSub):
def __len__(self):
"""Returns the number of operation points in the operation point list.
:return: number of operation points
:rtype: int
"""
return len(self.op)
def __getitem__(self, index):
"""Returns a operation point at a given index.
:param index: index
:type index: int
:return: operation point at given index
:rtype: operation point instance
"""
return self.op[index]
[docs]class RestartValidationOpListSub(supermod.RestartValidationOpList, collections.Sequence, StrSub):
def __len__(self):
"""Returns the number of operation points in the operation point list.
:return: number of operation points
:rtype: int
"""
return len(self.op)
def __getitem__(self, index):
"""Returns a operation point at a given index.
:param index: index
:type index: int
:return: operation point at given index
:rtype: OpSub
"""
return self.op[index]
[docs]class OpInitialisationGlobalSub(supermod.OpInitialisationGlobal, StrSub):
def __len__(self):
"""Returns the number of block groups in the global settings list.
:return: number of operation points
:rtype: int
"""
return len(self.blockGroup)
def __getitem__(self, index):
"""Returns a block group at a given index.
:param index: index
:type index: int
:return: block group at given index
:rtype: OpInitialisationGlobalPerRowSub
"""
return self.blockGroup[index]
[docs]class OpInitialisationGlobalPerRowSub(supermod.OpInitialisationGlobalPerRow, StrSub):
def __getitem__(self, attr):
"""Returns the related value
:param attr: attribute name
:type attr: str
"""
for key in self.__dict__.keys():
if key.lower() == attr.lower():
return getattr(self, key)
return None
[docs]class OpSub(supermod.Op, StrSub):
pass
[docs]class ValidationOpSub(supermod.ValidationOp, StrSub):
pass
[docs]class RestartValidationOpSub(supermod.RestartValidationOp, StrSub):
pass
[docs]class CommandsSub(supermod.Commands, StrSub):
def __len__(self):
"""Returns the number of operation points in the operation point list.
:return: number of operation points
:rtype: int
"""
return len(self.job)
def __getitem__(self, index):
"""Returns a operation point at a given index.
:return: operation point at given index
:rtype: operation point instance
"""
return self.job[index]
[docs]class RowListSub(supermod.RowList, collections.Sequence, StrSub):
def __len__(self):
"""Returns the number of rows in the row list.
:return: number of rows
:rtype: int
"""
return len(self.row)
def __getitem__(self, index):
"""Returns a row at a given index.
:return: row at given index
:rtype: row instance
"""
return self.row[index]
def append(self, row):
self.row.append(row)
def insert(self, index, value):
self.row.insert(index, value)
[docs]class RowSub(supermod.Row, StrSub):
pass
[docs]class SliceSub(supermod.Slice):
[docs] def getSlice(self):
"""Return the selected index slice as a python slice object.
:return: slice object
:rtype: slice
"""
max_index = None if self.max == -1 else self.max + 1
return slice(self.min, max_index, self.step)