Source code for mojo.pavayo.testcase.testSuiteComponents

"""
testSuiteComponents.py
======================

testSuiteComponents contains all the subclasses which are to small to get an own file. The only changes to the parent classes are implemented in OpListSub and
JobListSub. These classes also inherit from collections.Sequence . JobSub got an method to return the job represented as job from the jobManagement package.
"""

import collections
import os
import re
import io

from typing import Optional

from ...bricabrac import fileIO
from ..computeMethods import getMOJOPythonPath, convertWallTimeToWeight
from ...jobManagement.jobs.job import Job as JobManagementJob
from ...jobManagement.jobs.jobList import JobList as JobManagementJobList
from ...jobManagement import jobManagementData as jobConsts
from ...jobManagement.management.clusterInfo import ClusterInfo
from ...jobManagement.management.cpuInfo import CPUInfo

from ...pavayo import computeData as constants

from . import testcase_generated as supermod


def _replaceExecutable(executableDict, executableIn, resourcesDict=None):
    """Replace a executable from a XML file with a executable passed to PAVAYO and return the related job type.

    :param executableDict:  dictionary of executables
    :param executableIn:    name of the executable from the XML file
    :type executableDict:   ExecutableResources
    :type executableIn:     str

    :return:                path of the executable or python method to use by the job class
    :return:                job type
    :rtype:                 str or function
    :rtype:                 str or None
    """

    def createUsedresourcesDict(execName): return {execName: 1} if resourcesDict is not None and execName in resourcesDict else None

    if executableIn.startswith("$"):
        executableNameUpper = executableIn.upper()[1:]
        execResource = executableDict[executableIn.upper()]
        executable = execResource.path
        if not executable:
            raise RuntimeError(f"Could not find a replacement for the template '{executableIn}'")

        if execResource.environmentName == constants.GMCPLAY_TEMPLATE:
            jobType = constants.GMC_PLAY_EXEC
        else:
            for traceSuiteProg in (constants.TRACE_SUITE_TRACE_EXEC, constants.TRACE_SUITE_PREP_EXEC, constants.TRACE_SUITE_POST_EXEC):
                if executableNameUpper.startswith(traceSuiteProg):
                    jobType = traceSuiteProg
                    break
            else:
                jobType = None

        __setWeight = {constants.GMCPLAY_TEMPLATE: jobConsts.DEFAULT_WEIGHT_METHOD, constants.RM_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
                       constants.MV_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, constants.CP_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
                       constants.LN_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, constants.PYTHON_TEMPLATE: jobConsts.DEFAULT_WEIGHT_SHELL,
                       constants.TECPLOT_TEMPLATE: jobConsts.DEFAULT_WEIGHT_METHOD, constants.TECPLOT_MACRO_TEMPLATE: jobConsts.DEFAULT_WEIGHT_METHOD}
        weight = __setWeight.get(executableIn)

        usedResources = createUsedresourcesDict(executableNameUpper)
    else:
        executable = executableIn
        jobType = None
        weight = None
        usedResources = createUsedresourcesDict(executableIn)

    weight = weight or 0

    return executable, jobType, weight, usedResources


[docs]class StrSub: """Contains the string method only. """ def __str__(self): string = io.StringIO() self.export(string, 0) # pylint: disable=no-member return string.getvalue()[:-1]
[docs]class DependencySub(supermod.Dependency, StrSub): def getPath(self): return os.path.join(self.level if self.level is not None else "", self.subLevel if self.subLevel is not None else "", self.job if self.job is not None else "")
def distribute_processes_among_threads(n_processes: int, processes_per_node: Optional[int], threads_per_process: Optional[int]) -> (int, int, int): if processes_per_node is not None and threads_per_process is not None: threads_per_process = threads_per_process * processes_per_node elif processes_per_node is not None: # threads per process is None threads_per_process = processes_per_node else: # ignoring a potential set thread_per_process cpu_info = ClusterInfo.from_environment().cpuInfo if ClusterInfo.onValidCluster() else CPUInfo.from_proc_cpuinfo() threads_per_process = cpu_info.numSiblingsPerSocket threads_per_process = min(threads_per_process, n_processes) processes_per_node = 1 n_processes = 1 return n_processes, processes_per_node, threads_per_process class SetupJobSub: def _getJob(self, options, executableDict, workingDirectory, outputDir, outputFile=None, resourcesDict=None): """Returns the instance of a job from the job management. The job represents the job hold by the instance. :param options: options instance with options parsed from the command line :param executableDict: dictionary of executables :param workingDirectory: base path of the test case :param outputDir: output folder :param outputFile: path of output file :type options: OptionParser instance :type executableDict: ExecutableResources :type workingDirectory: str :type outputDir: str :type outputFile: Optional[str] :return: a job instance which represents the instance itself :rtype: JobManagementJob or JobManagementJobList """ executable, jobType, weight, usedResources = _replaceExecutable(executableDict, self.executable, resourcesDict=resourcesDict) try: if executableDict[self.executable].commandLineArguments: self.args = self.args + executableDict[self.executable].commandLineArguments except KeyError: pass # add additional Tecplot arguments if executable == constants.TECPLOT_EXECUTABLE: args = constants.TECPLOT_ARGUMENTS + self.args self.executeOnMaster = True nTecLics = 1 else: args = self.args nTecLics = 0 try: _, execName = os.path.split(executable) except TypeError: # if the executable is a method execName = "" # update job type if not specified yet self.jobType = self.jobType or jobType if self.jobType == constants.GMC_PLAY_EXEC: self.nProcs = 1 elif self.jobType == constants.TRACE_SUITE_TRACE_EXEC: self.threadsPerProc = options.threadsPerProc or self.threadsPerProc self.procsPerNode = options.procsPerNode or self.procsPerNode if options.useThreadsInsteadOfProcesses: self.nProcs, self.procsPerNode, self.threadsPerProc = distribute_processes_among_threads(self.nProcs, self.procsPerNode, self.threadsPerProc) if execName in options.valgrindExec: jobName = self.name + "_" + "valgrind" readVar = "yes" # yes|no trackOrigins = "yes" # no|yes leakCheck = "full" # no|summary|full genSupp = "no" # no|yes|all showReachable = "no" if execName in constants.EXECUTABLES_TO_RUN_WITH_VALGIRIND else "yes" xmlFileNames = f"--xml=yes --xml-file=valgrind.{self.name}.%p.xml" valgrindCommandLine = constants.VALGRIND_COMMAND_LINE.format(readVarInfo=readVar, trackOrigins=trackOrigins, leakCheck=leakCheck, showReachable=showReachable, suppressions=options.valgrindSuppFile, genSuppressions=genSupp, logFile=os.path.join(outputDir, constants.VALGRIND_LOG_PREFIX + jobName), xmlFileNames=xmlFileNames) args = valgrindCommandLine.split() + [executable] + args executable = constants.VALGRIND_EXECUTABLE if self.successStatement: successStatement = self.successStatement else: try: successStatement = executableDict[self.executable].successStatement except KeyError: successStatement = list() if self.failStatement: failStatement = self.failStatement else: try: failStatement = executableDict[self.executable].failStatement except KeyError: failStatement = list() successStatementsRe = [statement.valueOf_ if statement.isRegExp else re.escape(statement.valueOf_) for statement in successStatement] failStatementsRe = [statement.valueOf_ if statement.isRegExp else re.escape(statement.valueOf_) for statement in failStatement] envVars = {"PYTHONPATH": getMOJOPythonPath() + ":" + os.environ.get("PYTHONPATH", ""), "MPLBACKEND": "agg"} if executable == constants.PYTHON_EXECUTABLE else None # set weight from wallTime or DEFAULT weight = convertWallTimeToWeight(self, jobConsts.DEFAULT_WEIGHT_SHELL if weight == 0 else weight) weight = weight * (self.nProcs or 1) * (self.threadsPerProc or 1) nProcs = self.nProcs if options.allowFewerProcs: nProcs = min(self.nProcs, options.nprocs) args = [_replaceExecutable(executableDict, el)[0] for elList in (arg.split() for arg in args) for el in elList] if options.useThreadsInsteadOfProcesses: # remove load balance file parameter as it does not mix with the changed processor count try: lbIndex = args.index("-lb") except ValueError: pass else: del args[lbIndex:lbIndex + 2] aJob = JobManagementJob(executable=executable, args=args, nProcs=nProcs, nTecplotLicenses=nTecLics, jobName=self.name, queue=options.queue, workingDirectory=workingDirectory, outputFile=outputFile, outputDir=outputDir, procsPerNode=self.procsPerNode, threadsPerProc=self.threadsPerProc, nodeType=self.nodeType or options.nodeType, executeOnMaster=self.executeOnMaster, jobType=self.jobType, successStatements=successStatementsRe, failStatements=failStatementsRe, ignoreExitCode=self.ignoreExitCode, envVars=envVars, wallTime=self.wallTime, weight=weight, resourcesDict=usedResources, useMpirun=self.useMpirun, account=options.account, slurmSwitches=options.slurmSwitches) return aJob
[docs]class SuiteSub(supermod.Suite, StrSub): pass
[docs]class TclistSub(supermod.Tclist, StrSub): pass
[docs]class JobListSub(supermod.JobList, collections.Sequence, StrSub): """ """ COMPUTATION = "COMPUTATION" POSTPROCESSING = "POSTPROCESSING" RESTART = "RESTART" MESHING = "MESHING" def __len__(self): """Returns the number of operation points in the operation point list. :return: number of operation points :rtype: int """ return len(self.job) def __getitem__(self, index): """Returns a operation point at a given index. :return: operation point at given index :rtype: operation point instance """ return self.job[index]
[docs] def append(self, job): """Appends a new job to the job list represented by the instance. :param job: the job instance to append :type job: TestcaseJob instance """ self.job.append(job)
[docs] def insert(self, index, value): """Inserts a job object at the given index. :param index: index in the job list where the job will be inserted :param value: the new job object :type index: integer :type value: TestcaseJob instance """ self.job.insert(index, value)
[docs]class SimulationJobSub(supermod.SimulationJob, SetupJobSub, StrSub): """ """
[docs] def getJob(self, options, executableDict, workingDirectory, outputDir, resourcesDict=None): """Returns the instance of a job from the job management. The job represents the job hold by the instance. :param options: options instance with options parsed from the command line :param executableDict: dictionary of executables :param workingDirectory: base path of the test case :param outputDir: output folder :type options: OptionParser instance :type executableDict: ExecutableResources :type workingDirectory: str :type outputDir: str :return: a job instance which represents the instance itself :rtype: JobManagementJob or JobManagementJobList """ aJob = self._getJob(options, executableDict, workingDirectory, outputDir, None, resourcesDict=resourcesDict) if self.saveLog: aJobList = JobManagementJobList(name=self.name + "JobList") aJobList.addJob(aJob) collectJob = JobManagementJob(SimulationJobSub._copyOutputFile, args=[self, os.path.join(outputDir, aJob.outFile)], jobName=self.name + "Copy", workingDirectory=workingDirectory, executeOnMaster=True, outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD) aJobList.addJob(collectJob, [aJob.id]) return aJobList else: return aJob
@staticmethod def _copyOutputFile(job, sourceFile): """Copies the source file to the path stored in attribute 'saveLog'. :param job: SimulationJobSub instance :param sourceFile: path to source file :type job: SimulationJobSub :type sourceFile: str """ path, _ = os.path.split(job.saveLog) fileIO.md(path) fileIO.globCopy(sourceFile, job.saveLog, match=True, verbose=1)
[docs]class AnalysisJobSub(supermod.AnalysisJob, StrSub): """ """
[docs] def getJob(self, _options, executableDict, workingDirectory, outputDir, resourcesDict=None): """Returns the instance of a job from the job management. The job represents the job hold by the instance. :param _options: options instance with options parsed from the command line :param executableDict: dictionary of executables :param workingDirectory: base path of the test case :param outputDir: output folder :type _options: OptionParser instance :type executableDict: ExecutableResources :type workingDirectory: str :type outputDir: str :return: a job instance which represents the instance itself :rtype: JobManagementJob or JobManagementJobList """ executable, jobType, weight, usedResources = _replaceExecutable(executableDict, self.executable, resourcesDict=resourcesDict) # add additional Tecplot arguments if executable == constants.TECPLOT_EXECUTABLE: args = constants.TECPLOT_ARGUMENTS + self.args else: args = self.args args = [_replaceExecutable(executableDict, el)[0] for elList in (arg.split() for arg in args) for el in elList] # check whether jobs needs Tecplot nTecLics = 1 if self.useTecplot or executable == constants.TECPLOT_EXECUTABLE else 0 # add MOJO path to environment for Python scripts envVars = {"PYTHONPATH": getMOJOPythonPath() + ":" + os.environ.get("PYTHONPATH", ""), "MPLBACKEND": "agg"} if executable == constants.PYTHON_EXECUTABLE else None # set weight from wallTime or DEFAULT weight = convertWallTimeToWeight(self, jobConsts.DEFAULT_WEIGHT_SHELL if weight == 0 else weight) return JobManagementJob(executable=executable, args=args, nProcs=1, jobName=self.name, nTecplotLicenses=nTecLics, workingDirectory=workingDirectory, outputDir=outputDir, executeOnMaster=True, jobType=jobType, ignoreExitCode=self.ignoreExitCode, envVars=envVars, weight=weight, resourcesDict=usedResources)
[docs]class JobSub(supermod.Job, SetupJobSub, StrSub): """ """
[docs] def getJob(self, options, executableDict, basePath, outputDir, resourcesDict=None): """Returns the instance of a job from the job management. The job represents the job hold by the instance. :param options: options parsed from the command line :param executableDict: dictionary of executables :param basePath: base path of the test case :param outputDir: path of output directory :type options: OptionParser instance :type executableDict: ExecutableResources :type basePath: str :type outputDir: str :return: job instance which represents the instance itself :rtype: Job """ workingDirectory = os.path.join(basePath, self.workingDirectory) if self.workingDirectory else basePath outputFile = self.outputFile aJob = self._getJob(options, executableDict, workingDirectory, outputDir, outputFile, resourcesDict=resourcesDict) return aJob
[docs]class SetupSub(supermod.Setup, collections.Sequence, StrSub):
[docs] def find(self, name): """ """ for opList in self.opList: if opList.name == name: return opList return None
def __len__(self): """Returns the number of operation point lists. :return: number of operation point lists :rtype: int """ return len(self.opList) def __getitem__(self, index): """Returns a operation point list at a given index. :param index: index :type index: int :return: operation point list at given index :rtype: ValidationOpListSub """ return self.opList[index]
[docs]class RestartSetupSub(supermod.RestartSetup, collections.Sequence, StrSub):
[docs] def find(self, name): """ """ for opList in self.opList: if opList.name == name: return opList return None
def __len__(self): """Returns the number of operation point list. :return: number of operation point lists :rtype: int """ return len(self.opList) def __getitem__(self, index): """Returns a operation point list at a given index. :param index: index :type index: int :return: operation point list at given index :rtype: RestartValidationOpListSub """ return self.opList[index]
[docs]class OpListSub(supermod.OpList, collections.Sequence, StrSub): def __len__(self): """Returns the number of operation points in the operation point list. :return: number of operation points :rtype: int """ return len(self.op) def __getitem__(self, index): """Returns a operation point at a given index. :param index: index :type index: int :return: operation point at given index :rtype: ValidationOpSub """ return self.op[index] def append(self, operationPoint): self.op.append(operationPoint) def insert(self, index, value): self.op.insert(index, value)
[docs]class ValidationOpListSub(supermod.ValidationOpList, collections.Sequence, StrSub): def __len__(self): """Returns the number of operation points in the operation point list. :return: number of operation points :rtype: int """ return len(self.op) def __getitem__(self, index): """Returns a operation point at a given index. :param index: index :type index: int :return: operation point at given index :rtype: operation point instance """ return self.op[index]
[docs]class RestartValidationOpListSub(supermod.RestartValidationOpList, collections.Sequence, StrSub): def __len__(self): """Returns the number of operation points in the operation point list. :return: number of operation points :rtype: int """ return len(self.op) def __getitem__(self, index): """Returns a operation point at a given index. :param index: index :type index: int :return: operation point at given index :rtype: OpSub """ return self.op[index]
[docs]class OpInitialisationGlobalSub(supermod.OpInitialisationGlobal, StrSub): def __len__(self): """Returns the number of block groups in the global settings list. :return: number of operation points :rtype: int """ return len(self.blockGroup) def __getitem__(self, index): """Returns a block group at a given index. :param index: index :type index: int :return: block group at given index :rtype: OpInitialisationGlobalPerRowSub """ return self.blockGroup[index]
[docs]class OpInitialisationGlobalPerRowSub(supermod.OpInitialisationGlobalPerRow, StrSub): def __getitem__(self, attr): """Returns the related value :param attr: attribute name :type attr: str """ for key in self.__dict__.keys(): if key.lower() == attr.lower(): return getattr(self, key) return None
[docs]class OpSub(supermod.Op, StrSub): pass
[docs]class ValidationOpSub(supermod.ValidationOp, StrSub): pass
[docs]class RestartValidationOpSub(supermod.RestartValidationOp, StrSub): pass
[docs]class CommandsSub(supermod.Commands, StrSub): def __len__(self): """Returns the number of operation points in the operation point list. :return: number of operation points :rtype: int """ return len(self.job) def __getitem__(self, index): """Returns a operation point at a given index. :return: operation point at given index :rtype: operation point instance """ return self.job[index]
[docs]class RowListSub(supermod.RowList, collections.Sequence, StrSub): def __len__(self): """Returns the number of rows in the row list. :return: number of rows :rtype: int """ return len(self.row) def __getitem__(self, index): """Returns a row at a given index. :return: row at given index :rtype: row instance """ return self.row[index] def append(self, row): self.row.append(row) def insert(self, index, value): self.row.insert(index, value)
[docs]class RowSub(supermod.Row, StrSub): pass
[docs]class SliceSub(supermod.Slice):
[docs] def getSlice(self): """Return the selected index slice as a python slice object. :return: slice object :rtype: slice """ max_index = None if self.max == -1 else self.max + 1 return slice(self.min, max_index, self.step)