Source code for mojo.pavayo.testcase.speedlineTestCase

"""
speedlineTestCase.py
====================

speedlineTestCase offers the class SpeedlineTestCaseSub. SpeedlineTestcaseSub represents a test case those computations use the solution of a previous computation
as initial solution. At the moment the back pressure is changed between every computation, but other scenarios are imaginable. To change the settings in the
CGNS file PAVAYO has to use gmcPlay.
"""

import collections
import os

from . import testcase_generated as supermod
from .abstractTestCase import AbstractTestCase
from ..joblists.speedline import buildSpeedlineJobList
from ..computeMethods import convertWallTimeToWeight

from ...jobManagement.jobs.jobList import JobList
from ...jobManagement.jobs.job import Job
from ...jobManagement import jobManagementData as jobConsts

from ...pavayo import computeData as constants


[docs]class SpeedlineTestCaseSub(supermod.SpeedlineTestCase, collections.Sequence, AbstractTestCase): """ Represents a speedline test case. Inherits from collections. Sequence so you can iterate over its operation point lists. All possible parameters to the constructor of this class can be found in the XSD file used to generate testcase_generated.py. """
[docs] def append(self, opList): """Appends a given operation point list to the test case. :param opList: a operation point list to append to the testcase :type opList: OperationPointList instance """ self.opList.append(opList)
[docs] def getPostCommandJobs(self, options, executableDict, isParallelTestCase=False, resourcesDict=None): """Generates jobs from the post commands in a test case. :param options: the Argparse instance of the current run of PAVAYO :param executableDict: dictionary of executables :param isParallelTestCase: flag to indicate if the instance calling is a parallel test case; in this case the folder strucure is slightly different :param resourcesDict: resources used by the jobs :type options: argparse :type executableDict: ExecutableResources :type isParallelTestCase: bool :type resourcesDict: dict :return: a job list containing all post command jobs :rtype: list of Job """ postJobs = list() for opList in self: for op in opList: if op.addPostCommand or opList.addPostCommand: if isParallelTestCase: outputDir = op.outputDir if op.outputDir else constants.TRACE_DEFAULT_OUTPUT_DIR workingDir = os.path.join(self.path, opList.name, op.name, outputDir) else: # validation workingDir = os.path.join(self.path, self.outputDir, self.cgnsOutDir) pointName = "_".join((self.name, "POST", opList.name, op.name)) for index, elt in enumerate(op.addPostCommand + opList.addPostCommand): # number of processor cores if elt.nProcs: nbProc = elt.nProcs elif isParallelTestCase: nbProc = int(op.numCPUs[0]) else: # validation nbProc = int(self.numCPUs[0]) # set weight from wallTime or DEFAULT weight = convertWallTimeToWeight(elt, jobConsts.DEFAULT_WEIGHT_SHELL) weight = weight * nbProc * (options.threadsPerProc or elt.threadsPerProc or 1) postJob = Job(executable=executableDict[constants.TRACE_SUITE_POST_EXEC].path, args=elt.args, nProcs=nbProc, jobType="POST", queue=options.queue, workingDirectory=workingDir, procsPerNode=(options.procsPerNode or elt.procsPerNode), threadsPerProc=(options.threadsPerProc or elt.threadsPerProc), jobName=pointName, outputFile=pointName + f".{index:03}.out", wallTime=elt.wallTime, weight=weight, resourcesDict=resourcesDict, account=options.account, group="{}:Post".format(self.name)) postJobs.append(postJob) return postJobs
[docs] def getComputationJobList(self, options, executableDict, resourcesDict=None): """Returns a job list containing all jobs for the computation step of the test case. It starts with a pre script, if existing, followed by the speed lines and the post commands. :param options: Argparse instance of the current PAVAYO run :param executableDict: dictionary of executables :type options: argparse :type executableDict: ExecutableResources :return: job list representing teh computation of this test case or Noen :rtype: JobList or None """ lastJobId = [] myJobList = JobList(name=self.name + "_COMPUTATION_JOBLIST", verbosity=options.verbose, retries=options.retriesComputation, deactivateJobGrouping=options.deactivateClusterJobGrouping) preScriptJob = super(SpeedlineTestCaseSub, self).getPreScriptJob(executableDict) if preScriptJob: myJobList.addJob(preScriptJob) lastJobId = [preScriptJob.id] traceJobs = buildSpeedlineJobList(trace=executableDict[constants.TRACE_SUITE_TRACE_EXEC].path, gmcPlay=executableDict[constants.GMCPLAY_TEMPLATE].path, testCase=self, options = options, postproc=False, name=self.name + "_SPEEDLINE") if traceJobs: myJobList.addJob(traceJobs, lastJobId) lastJobId = [traceJobs.id] postJobs = self.getPostCommandJobs(options, executableDict) if postJobs: for job in postJobs: myJobList.addJob(job, lastJobId) lastJobId = [job.id] if lastJobId: return myJobList else: return None
[docs] def getPostprocessingJobList(self, *args, **kwargs): """Returns a job containing the post script of this test case. :param options: Argparse instance of the current PAVAYO run :type options: Argparse instance """ return super(SpeedlineTestCaseSub, self).getPostscriptJob(*args, **kwargs)
def __len__(self): """Returns the number of operation point lists in the testcase. :return: number of operation point lists :rtype: int """ return len(self.opList) def __getitem__(self, index): """Returns a operation point list at a given index. :return: operation point list at given index :rtype: operation point list instance """ return self.opList[index]