Source code for mojo.pavayo.testcase.validationTestCaseSpeedline

"""
ValidationTestCaseSpeedline
============================

The test case class ValidationTestCaseSpeedline is derived from the class ValidationTestCase. The only difference between the two test case
classes is the processing order of the operating points. Here it is assumed that all operating points in one operating point list depend on
each other. So they are executed serially.
"""


import collections
import os

from . import testcase_generated as supermod
from .abstractTestCase import AbstractTestCase
from .testSuiteComponents import OpSub
from .validationTestCase import ValidationTestCaseSub

from .. import computeData as constants
from ..computeMethods import convertWallTimeToWeight

from ...gmcplay import gmcPlayData as gmcConst
from ...gmcplay.gmcPlayInValidationTestCase import createAndExecuteGmcPlayJournalFile

from ...bricabrac.fileIO import forceSymlink, globCopy
from ...jobManagement.jobs.jobList import JobList
from ...jobManagement.jobs.job import Job
from ...jobManagement import jobManagementData as jobConsts


def manageSources(previousPoint, setup, cgnsFile):
    """Copies the CGNS file and links the solution from previous operating point to the current directory

    :param previousPoint:       previous operating point
    :param setup:               current setup
    :param cgnsFile:            name of the CGNS file in the previous operating point
    :type previousPoint:        ValidationOpSub
    :type setup:                SetupSub
    :type cgnsFile:             str
    """

    try:
        fileName = setup.output.cgns.file
    except AttributeError:
        fileName = constants.TRACE_DEFAULT_CGNS_NAME

    try:
        directory = os.path.join("../..", previousPoint.name, constants.INPUT_DIR, setup.output.cgns.directory)
    except AttributeError:
        directory = os.path.join("../..", previousPoint.name, constants.OUTPUT_DIR, constants.DEFAULT_CGNS_DIR)

    srcPathToCGNS = os.path.join("../..", previousPoint.name, constants.INPUT_DIR, cgnsFile)
    srcPathToLinkFile = os.path.join(directory, fileName)

    forceSymlink(srcPathToLinkFile, f"{previousPoint.name}.cgns")
    globCopy(srcPathToCGNS, os.curdir)


[docs]class ValidationTestCaseSpeedlineSub(supermod.ValidationTestCaseSpeedline, ValidationTestCaseSub, collections.Sequence, AbstractTestCase): """ Represents a speedline test case. Inherits from collections. Sequence so you can iterate over its operation point lists. All possible parameters to the constructor of this class can be found in the XSD file used to generate testcase_generated.py. """ def _addJobsOfOperatingPointToJobList(self, options, executableDict, testCaseJobList, setupDir, setup, opList, op, opPrevious, opListLastJobId, traceJob, cgnsFilePreviousOp, manageSourcesJob, resourcesDict=None, jobNamePrefix="OP"): """Adds the jobs for a single operating point to the job list. :param options: pavayo options :param executableDict: dictionary of executables :param testCaseJobList: job list of a test case :param setupDir: directory of setup :param setup: current setup :param opList: current operating point list :param op: current operating point :param opPrevious: previous operating point :param opListLastJobId: index list of last jobs :param traceJob: Trace job :param cgnsFilePreviousOp: CGNS file of previous operating point :param manageSourcesJob: source management job :param resourcesDict: resource dictionary :param jobNamePrefix: prefix for job names :type options: Namespace :type executableDict: ExecutableResources :type testCaseJobList: list :type setupDir: str :type setup SetupSub or RestartSetupSub :type opList: ValidationOpListSub or RestartValidationOpListSub :type op: ValidationOpSub or RestartValidationOpSub :type opPrevious: ValidationOpSub or RestartValidationOpSub :type opListLastJobId: list :type traceJob: Job :type cgnsFilePreviousOp: str :type manageSourcesJob: Job :type resourcesDict: dict :type jobNamePrefix: str """ inputDir = os.path.join(setupDir, opList.name, op.name, constants.INPUT_DIR) cgnsFile = op.cgnsFile or opList.cgnsFile or setup.cgnsFile jobNameFormat = "_".join([jobNamePrefix, "{0}", ] + [x for x in [setup.name, opList.name, op.name] if not (x == '.' or x == '' or x is None)]) outputFileDir = os.path.join(self.path, constants.LOG_DIR, setup.name, opList.name) outputFileFormat = "{pointName}_{jobType}.out" if traceJob and (not options.step or constants.COMPUTATION_STEP_RESET in options.step): name = jobNameFormat.format("manageSources") manageSourcesJob = Job(manageSources, args=[opPrevious, setup, cgnsFilePreviousOp], jobName=name, workingDirectory=inputDir, outputFile=os.path.join(outputFileDir, outputFileFormat.format(jobType="manageSources", pointName=op.name)), weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:ManageSources".format(self.name)) testCaseJobList.addJob(manageSourcesJob, opListLastJobId) opListLastJobId = [manageSourcesJob.id] # change back pressures etc. (gmcPlay) for all points where boundary conditions are specified if not options.step or constants.COMPUTATION_STEP_GMC in options.step: name = jobNameFormat.format("gmcPlay") gmcPlayJob = Job(createAndExecuteGmcPlayJournalFile, args=[executableDict[constants.GMCPLAY_TEMPLATE].path, setup, op, cgnsFile], kwargs=dict(opPreviousName=opPrevious.name, journalFileName=name + gmcConst.GMC_JOURNAL_EXTENSION), jobName=name, executeOnMaster=True, workingDirectory=inputDir, resourcesDict=resourcesDict, outputFile=os.path.join(outputFileDir, outputFileFormat.format(jobType="gmcPlay", pointName=op.name)), weight=jobConsts.DEFAULT_WEIGHT_METHOD, group="{}:GMCPlay".format(self.name)) testCaseJobList.addJob(gmcPlayJob, opListLastJobId) opListLastJobId = [gmcPlayJob.id] # add prep jobs if not options.step or constants.COMPUTATION_STEP_PREP in options.step: opListLastJobId = ValidationTestCaseSpeedlineSub.getPrepCommands(options, executableDict, testCaseJobList, opList, op, opListLastJobId, inputDir, outputFileDir, resourcesDict=resourcesDict) # add trace job if not options.step or constants.COMPUTATION_STEP_TRACE in options.step: arguments = ValidationTestCaseSpeedlineSub.getArguments(os.path.join(inputDir, cgnsFile), opList, op) # outfilename will be set right automatically # set weight from wallTime or DEFAULT weight = convertWallTimeToWeight(op, jobConsts.DEFAULT_WEIGHT_SHELL) weight = weight * (setup.nProcs or 1) * (options.threadsPerProc or setup.threadsPerProc or 1) name = jobNameFormat.format("TRACE") traceJob = Job(executableDict[constants.TRACE_SUITE_TRACE_EXEC].path, args=arguments, nProcs=setup.nProcs, procsPerNode=(options.procsPerNode or setup.procsPerNode), queue=options.queue, jobName=name, threadsPerProc=(options.threadsPerProc or setup.threadsPerProc), jobType="TRACE", workingDirectory=inputDir, outputFile=os.path.join(outputFileDir, outputFileFormat.format(jobType="TRACE", pointName=op.name)), nodeType=options.nodeType, wallTime=op.wallTime, weight=weight, resourcesDict=resourcesDict, account=options.account, slurmSwitches=options.slurmSwitches, group="{}:TRACE".format(self.name)) testCaseJobList.addJob(traceJob, opListLastJobId) opListLastJobId = [traceJob.id] # addPostCommands if not options.step or constants.COMPUTATION_STEP_POST in options.step: try: cgnsDir = os.path.join(setupDir, opList.name, op.name, constants.INPUT_DIR, setup.output.cgns.directory) except AttributeError: cgnsDir = os.path.join(setupDir, opList.name, op.name, constants.OUTPUT_DIR, constants.DEFAULT_CGNS_DIR) ValidationTestCaseSpeedlineSub.getPostCommands(options, executableDict, testCaseJobList, opList, op, opListLastJobId, cgnsDir, outputFileDir, resourcesDict=resourcesDict) opPrevious = op cgnsFilePreviousOp = cgnsFile return traceJob, cgnsFilePreviousOp, manageSourcesJob, opPrevious, opListLastJobId def _addSetupToJobList(self, options, executableDict, testCaseJobList, setup, resourcesDict=None, selectedSimDict=None, jobNamePrefix="OP"): """Adds a setup to the job list. :param options: pavayo options :param executableDict: dictionary of executables :param testCaseJobList: job list of a test case :param setup: current setup :param resourcesDict: resource dictionary :param selectedSimDict: dictionary of subset jobs :param jobNamePrefix: prefix for job names :type options: Namespace :type executableDict: ExecutableResources :type testCaseJobList: JobList :type setup SetupSub or RestartSetupSub :type resourcesDict: dict or None :type selectedSimDict: dict or None :type jobNamePrefix: str """ if selectedSimDict is None or selectedSimDict[setup.name] == "all": for opList in setup: setupDir = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name) traceJob = None cgnsFilePreviousOp = None manageSourcesJob = None opPrevious = OpSub(name="") opListLastJobId = list() for op in opList: (traceJob, cgnsFilePreviousOp, manageSourcesJob, opPrevious, opListLastJobId) = \ self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op, opPrevious, opListLastJobId, traceJob, cgnsFilePreviousOp, manageSourcesJob, resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix) else: for opList in setup: setupDir = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name) traceJob = None cgnsFilePreviousOp = None manageSourcesJob = None opPrevious = OpSub(name="") opListLastJobId = list() if opList.name in selectedSimDict[setup.name] and selectedSimDict[setup.name][opList.name] == "all": for op in opList: (traceJob, cgnsFilePreviousOp, manageSourcesJob, opPrevious, opListLastJobId) = \ self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op, opPrevious, opListLastJobId, traceJob, cgnsFilePreviousOp, manageSourcesJob, resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix) else: for op in opList: if op.name in selectedSimDict[setup.name][opList.name]: (traceJob, cgnsFilePreviousOp, manageSourcesJob, opPrevious, opListLastJobId) = \ self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op, opPrevious, opListLastJobId, traceJob, cgnsFilePreviousOp, manageSourcesJob, resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix)
[docs] def getRestartJobList(self, options, executableDict, resourcesDict=None): """Returns a job list containing all jobs for the resart step of the test case. :param options: argparse instance of the current PAVAYO run :param executableDict: dictionary of executables :param resourcesDict: resource dictionary :type options: Argparse :type executableDict: ExecutableResources :type resourcesDict: dict :return: the job list representing the computation of this test case or None :rtype: JobList or None """ if not self.restartSetup: return selectedSimDict = self.findSubsetInJobList(options.selectiveCompute) if options.selectiveCompute else None myJobList = JobList(name=self.name + "_UPDATERESTART_JOBLIST", verbosity=options.verbose) jobName = "Restart-setupJob" if selectedSimDict is not None: setupJob = Job(self._setupTestCasePartial, args=[selectedSimDict], kwargs=dict(restart=True, keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:UpdateRestart".format(self.name)) else: setupJob = Job(self._setupTestCase, kwargs=dict(restart=True, keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:UpdateRestart".format(self.name)) myJobList.addJob(setupJob) lastJobId = [setupJob.id] speedlineJobList = JobList(name=self.name + "_RESTART_SPEEDLINE", verbosity=options.verbose) for setup in self.restartSetup: self._addSetupToJobList(options, executableDict, speedlineJobList, setup, resourcesDict=resourcesDict, selectedSimDict=selectedSimDict, jobNamePrefix="Restart_" + self.name) if speedlineJobList: myJobList.addJob(speedlineJobList, lastJobId) lastJobId = [speedlineJobList.id] jobName = "Restart-CollectJob" collectJob = Job(ValidationTestCaseSpeedlineSub._collectData, args=[self], jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=os.path.join(self.path, constants.LOG_DIR), weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:RestartCollect".format(self.name)) myJobList.addJob(collectJob, parents=lastJobId) lastJobId = [collectJob.id] return myJobList
[docs] def getComputationJobList(self, options, executableDict, resourcesDict=None): """Returns a job list containing all jobs for the computation step of the test case. :param options: argparse instance of the current PAVAYO run :param executableDict: dictionary of executables :param resourcesDict: resource dictionary :type options: Argparse :type executableDict: ExecutableResources :type resourcesDict: dict :return: job list representing the computation of this test case or None :rtype: JobList or None """ if not self.setup: return selectedSimDict = self.findSubsetInJobList(options.selectiveCompute) if options.selectiveCompute else None myJobList = JobList(name=self.name + "_COMPUTATION_JOBLIST", verbosity=options.verbose, retries=options.retriesComputation, deactivateJobGrouping=options.deactivateClusterJobGrouping) if not options.step or constants.COMPUTATION_STEP_RESET in options.step: jobName = "setupJob" if selectedSimDict is not None: setupJob = Job(self._setupTestCasePartial, args=[selectedSimDict], kwargs=dict(keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:Computation".format(self.name)) else: setupJob = Job(self._setupTestCase, kwargs=dict(keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:Computation".format(self.name)) myJobList.addJob(setupJob) lastJobId = [setupJob.id] else: lastJobId = None speedlineJobList = JobList(name=self.name + "_SPEEDLINE", verbosity=options.verbose) for setup in self: self._addSetupToJobList(options, executableDict, speedlineJobList, setup, resourcesDict=resourcesDict, selectedSimDict=selectedSimDict, jobNamePrefix=self.name) if speedlineJobList: myJobList.addJob(speedlineJobList, lastJobId) lastJobId = [speedlineJobList.id] return myJobList