"""
ValidationTestCase
====================
The ValidationTestCase is designed for the computation and evaluation of several setup including several operating point lists.
This test case class uses the same folder structure as the TestSuiteTestCase. Related test cases comprise the sources folder and the
analysis folder. For the computation the data is linked or copied from the sources folder to the computation folder.
Each operating point is considered as independent. Thus there are no dependencies between operating points possible.
Before the TRACE computation is started the control file can be modified and several settings in the CGNS file can be set via gmcPlay.
When the computation is done post-processing jobs can be executed.
.. warning::
For the post-processing jobs the base folder is the CGNS folder. If a CGNS directory is defined in the setup it will be set via
gmcPlay and it will be used here. Otherwise it is expected that the CGNS file are at the default location ('../output/cgns/).
"""
import collections
import os
import numpy as np
from . import testcase_generated as supermod
from .abstractTestCase import AbstractTestCase
from .. import computeData as constants
from ..computeMethods import convertWallTimeToWeight
from ...gmcplay import gmcPlayData as gmcConst
from ...gmcplay.gmcPlayInValidationTestCase import createAndExecuteGmcPlayJournalFile
from ...bricabrac.fileIO import ensurePath, md, removeFile
from ...jobManagement.jobs.jobList import JobList
from ...jobManagement.jobs.job import Job
from ...jobManagement import jobManagementData as jobConsts
from ...latex import PyTex
from ...plotting.figureWrapper import FigureWrapper
from ...plotting.xyPlot import XYPlot
from ...plotting import plotStyle
from ...plotting import plotData
from ...traceSuiteControl.controlfile.tracecommandlistClass import TraceCommandList
from ...traceSuiteControl.uList.uListReader import readUList
from ...traceSuiteControl.uList import uListData as uListConst
from datfile import TecplotDataset
[docs]class ValidationTestCaseSub(supermod.ValidationTestCase, collections.Sequence, AbstractTestCase):
"""
Represents a speedline test case. Inherits from collections. Sequence so you can iterate over its operation point lists.
All possible parameters to the constructor of this class can be found in the XSD file used to generate
testcase_generated.py.
"""
[docs] def append(self, setup):
"""Appends a given setup to the test case.
:param setup: setup to append to the testcase
:type setup: SetupSub
"""
self.setup.append(setup)
[docs] def find(self, name):
"""Finds a given setup to the test case.
:param name: name of the setup searched for
:type name: str
:return: setup instance
:rtype: SetupSub
"""
for setup in self.setup:
if name == setup.name:
return setup
return None
def _createControlFile(self, setup, opList, op):
"""Creates new control file based on an existing control file (if it exists) and adds additional control commands.
:param setup: current setup
:param opList: current operating point list
:param op: current operating point
:type setup: SetupSub
:type opList: ValidationOpListSub
:type op: ValidationOpSub
"""
pathToJob = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name, constants.INPUT_DIR)
controlFilePath = os.path.join(pathToJob, constants.CONTROL_FILE_NAME)
listOfCommandsOP = TraceCommandList(filename=controlFilePath)
if op.controlFile:
controlFileOpPath = os.path.join(pathToJob, op.controlFile)
listOfCommandsOP.convertCommandFile(filename=controlFileOpPath)
elif opList.controlFile:
controlFileOpListPath = os.path.join(pathToJob, opList.controlFile)
listOfCommandsOP.convertCommandFile(filename=controlFileOpListPath)
for opCommand in op.addCtrlCommand:
listOfCommandsOP.addCommandLine(opCommand)
for opCommand in opList.addCtrlCommand:
listOfCommandsOP.addCommandLine(opCommand)
if not listOfCommandsOP.writeCommandFile():
raise RuntimeError(f"Cannot write new TRACE control file '{controlFilePath}'.")
def _createDirectories(self, setupName, opListName, opName):
"""Purges the computation, log and figure folder.
:param setupName: name of setup
:param opListName: name of OP list
:param opName: name of OP
:type setupName: str
:type opListName: str
:type opName: str
"""
md(os.path.join(self.path, constants.COMPUTATION_DIR, setupName, opListName, opName, constants.INPUT_DIR))
md(os.path.join(self.path, constants.COMPUTATION_DIR, setupName, opListName, opName, constants.OUTPUT_DIR))
def _setupRest(self, setup, opList, op):
"""Copies files needed by the test cases.
:param setup: current setup
:param opList: current operating point list
:param op: current operating point
:type setup: SetupSub
:type opList: ValidationOpListSub
:type op: ValidationOpSub
"""
# find all (unique) files in op and its op list to be copied or linked
copyFiles = set(opList.sourceFiles.copyFile if opList.sourceFiles else []) | set(op.sourceFiles.copyFile if op.sourceFiles else [])
linkFiles = set(opList.sourceFiles.linkFile if opList.sourceFiles else []) | set(op.sourceFiles.linkFile if op.sourceFiles else [])
if copyFiles & linkFiles:
raise RuntimeError("Try to link and copy file(s) {0} at the same time".format(", ".join(list(copyFiles & linkFiles))))
for copyFile in copyFiles:
self._copyFromSource(copyFile, setup.name, opList.name, op.name)
for linkFile in linkFiles:
self._linkFromSource(linkFile, setup.name, opList.name, op.name)
# update control file if necessary
if op.addCtrlCommand or opList.addCtrlCommand:
self._createControlFile(setup, opList, op)
def _setupTestCase(self, restart=False, keepLog=False):
"""Performs a clean for the test case.
Purges the computation, log and figure folder.
Copies files needed by the test cases.
:param restart: flag whether restart group or simulation group list is used
:param keepLog: flag whether log folder is removed
:type restart: bool
:type keepLog: bool
"""
removeFile(os.path.join(self.path, constants.COMPUTATION_DIR))
if not keepLog:
removeFile(os.path.join(self.path, constants.LOG_DIR))
removeFile(os.path.join(self.path, constants.FIGURES_DIR))
md(os.path.join(self.path, constants.LOG_DIR))
md(os.path.join(self.path, constants.FIGURES_DIR))
if restart:
setupList = self.restartSetup
else:
setupList = self.setup
for setup in setupList:
for opList in setup.opList:
for op in opList:
self._createDirectories(setup.name, opList.name, op.name)
for setup in setupList:
for opList in setup.opList:
for op in opList:
self._setupRest(setup, opList, op)
def _setupTestCasePartial(self, selectiveCompute, restart=False, keepLog=False):
"""Performs a clean for the test case.
Purges the computation, log and figure folder.
Copies files needed by the test cases.
:param selectiveCompute: list of selected computations
:param restart: flag whether restart group or simulation group list is used
:param keepLog: flag whether log folder is removed
:type selectiveCompute: list of str
:type restart: bool
:type keepLog: bool
"""
removeFile(os.path.join(self.path, constants.COMPUTATION_DIR))
if not keepLog:
removeFile(os.path.join(self.path, constants.LOG_DIR))
removeFile(os.path.join(self.path, constants.FIGURES_DIR))
md(os.path.join(self.path, constants.LOG_DIR))
md(os.path.join(self.path, constants.FIGURES_DIR))
if restart:
setupList = self.restartSetup
else:
setupList = self.setup
for setup in setupList:
if selectiveCompute[setup.name] == "all":
for opList in setup.opList:
for op in opList:
self._createDirectories(setup.name, opList.name, op.name)
else:
for opList in setup.opList:
if opList.name in selectiveCompute[setup.name] and selectiveCompute[setup.name][opList.name] == "all":
for op in opList:
self._createDirectories(setup.name, opList.name, op.name)
else:
for op in opList:
if op.name in selectiveCompute[setup.name][opList.name]:
self._createDirectories(setup.name, opList.name, op.name)
for setup in setupList:
if selectiveCompute[setup.name] == "all":
for opList in setup.opList:
for op in opList:
self._setupRest(setup, opList, op)
else:
for opList in setup.opList:
if opList.name in selectiveCompute[setup.name] and selectiveCompute[setup.name][opList.name] == "all":
for op in opList:
self._setupRest(setup, opList, op)
else:
for op in opList:
if op.name in selectiveCompute[setup.name][opList.name]:
self._setupRest(setup, opList, op)
def _setupPostProcessing(self):
"""Empty figures folder
"""
removeFile(os.path.join(self.path, constants.FIGURES_DIR))
md(os.path.join(self.path, constants.FIGURES_DIR))
@staticmethod
def _collectData(testcase):
"""Colletcs data from the computation folder in the update restart mode and copies it to the appropriate
folders in the source folder.
:param testcase: instance of validation test case
:type testcase: ValidationTestCaseSub
"""
for setup in testcase.restartSetup:
for opList in setup.opList:
for op in opList:
testcase._collectDataForSingleSaveFiles(op.saveFiles, setup.name, opList.name, op.name)
testcase._collectDataForSingleSaveFiles(opList.saveFiles, setup.name, opList.name)
testcase._collectDataForSingleSaveFiles(setup.saveFiles, setup.name)
[docs] @staticmethod
def getPrepCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, inputDir, outputFileDir, resourcesDict=None):
"""Static method to add the prep-processing commands to the job list.
:param options: pavayo options
:param executableDict: dictionary of executables
:param testCaseJobList: job list of a test case
:param opList: current operating point list
:param op: current operating point
:param opLastJobId: index list of the last jobs (used for dependencies)
:param inputDir: input directory
:param outputFileDir: directory for the log files
:param resourcesDict: resource dictionary
:type options: Namespace
:type executableDict: ExecutableResources
:type testCaseJobList: list()
:type opList: ValidationOpListSub
:type op: ValidationOpSub
:type opLastJobId: list()
:type inputDir: str
:type outputFileDir: str
:type resourcesDict: dict
:return: list of last jobs (used for dependencies)
:rtype: list
"""
prepCommands = list(opList.prepCommands) if opList.prepCommands else list()
prepCommands += list(op.prepCommands) if op.prepCommands else list()
if prepCommands:
prepJobs = dict()
prepJobIds = list()
for job in prepCommands:
prepJob = job.getJob(options, executableDict, inputDir, outputFileDir, resourcesDict=resourcesDict)
prepJob.setOutFile(os.path.join(outputFileDir, "_".join([op.name, os.path.basename(prepJob.getOutFile())])))
prepJobs[job] = prepJob
prepJobIds.append(prepJob.id)
for job, jobListJob in prepJobs.items():
dependencies = list()
if job.dependency:
for dependency in job.dependency:
if dependency.level or dependency.subLevel:
raise RuntimeError("No dependencies for POST jobs allowed outside the current operating point")
for parentJob in prepJobs.keys():
if parentJob.name == dependency.job:
dependencies.append(prepJobs[parentJob].id)
try:
prepJobIds.remove(prepJobs[parentJob].id)
except ValueError:
pass
break
if len(dependencies) != len(job.dependency):
raise AssertionError("Not all dependencies could be resolved. {0} != {1}".format(len(dependencies), len(job.dependency)))
else:
dependencies = opLastJobId
testCaseJobList.addJob(jobListJob, dependencies)
opLastJobId = prepJobIds
return opLastJobId
[docs] @staticmethod
def getArguments(cgnsFilePath, opList, op):
"""Static method to retrieve the basic settings for the TRACE command line.
:param cgnsFilePath: path to the CGNS file
:param opList: current operating point list
:param op: current operating point
:type cgnsFilePath: str
:type opList: ValidationOpListSub
:type op: ValidationOpSub
:return: TRACE command options as a list
:rtype: list()
"""
if op.addCtrlCommand or opList.addCtrlCommand:
controlFile = constants.CONTROL_FILE_NAME
else:
controlFile = op.controlFile or opList.controlFile
balanceFile = op.balanceFile or opList.balanceFile
arguments = ["-cgns", cgnsFilePath]
if controlFile:
arguments.append("-cntl")
arguments.append(controlFile)
if balanceFile:
arguments.append("-lb")
arguments.append(balanceFile)
return arguments
[docs] @staticmethod
def getPostCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, cgnsDir, outputFileDir, resourcesDict=None):
"""Static method to add the post-processing commands to the job list.
:param options: pavayo options
:param executableDict: dictionary of executables
:param testCaseJobList: job list of a test case
:param opList: current operating point list
:param op: current operating point
:param opLastJobId: index list of the last jobs (used for dependencies)
:param cgnsDir: start directory
:param outputFileDir: directory for the log files
:param resourcesDict: resource dictionary
:type options: Namespace
:type executableDict: ExecutableResources
:type testCaseJobList: list
:type opList: ValidationOpListSub
:type op: ValidationOpSub
:type opLastJobId: list
:type cgnsDir: str
:type outputFileDir: str
:type resourcesDict: dict
:return: list of last jobs (used for dependencies)
:rtype: list
"""
postCommands = list(opList.postCommands) if opList.postCommands else list()
postCommands += list(op.postCommands) if op.postCommands else list()
postJobs = dict()
for job in postCommands:
postJob = job.getJob(options, executableDict, cgnsDir, outputFileDir, resourcesDict=resourcesDict)
postJob.setOutFile(os.path.join(outputFileDir, "_".join([op.name, os.path.basename(postJob.getOutFile())])))
postJobs[job] = postJob
for job, jobListJob in postJobs.items():
dependencies = list()
if job.dependency:
for dependency in job.dependency:
if dependency.level or dependency.subLevel:
raise RuntimeError("No dependencies for POST jobs allowed outside the current operating point")
for parentJob in postJobs.keys():
if parentJob.name == dependency.job:
dependencies.append(postJobs[parentJob].id)
break
if len(dependencies) != len(job.dependency):
raise AssertionError("Not all dependencies could be resolved. {0} != {1}".format(len(dependencies), len(job.dependency)))
else:
dependencies = opLastJobId
testCaseJobList.addJob(jobListJob, dependencies)
[docs] def findSubsetInJobList(self, nameList, step=constants.COMPUTATION):
"""find a subset in a job list using a list of keywords
:param nameList: list of keywords
:param step: working step
:type nameList: list of str
:type step: str
:return: dictionary of all jobs
:rtype: collections.defaultdict
"""
if step.upper() == constants.COMPUTATION:
allJobs = super(ValidationTestCaseSub, self).findSubsetInJobList("setup", "opList", "op", nameList)
return allJobs
elif step.upper() == constants.POSTPROCESSING:
return super(ValidationTestCaseSub, self).findSubsetInJobList("analysisGroup", "analysis", "job", nameList)
else:
raise ValueError(f"Unknown step '{step}' in test case '{self.name}'")
def _addJobsOfOperatingPointToJobList(self, options, executableDict, testCaseJobList, setupDir, setup, opList, op, resourcesDict=None,
jobNamePrefix="OP"):
"""Adds the jobs for a single operating point to the job list.
:param options: pavayo options
:param executableDict: dictionary of executables
:param testCaseJobList: job list of a test case
:param setupDir: directory of setup
:param setup: current setup
:param opList: current operating point list
:param op: current operating point
:param resourcesDict: resource dictionary
:param jobNamePrefix: prefix for job names
:type options: Namespace
:type executableDict: ExecutableResources
:type testCaseJobList: JobList
:type setupDir: str
:type setup SetupSub or RestartSetupSub
:type opList: ValidationOpListSub or RestartValidationOpListSub
:type op: ValidationOpSub or RestartValidationOpSub
:type resourcesDict: dict
:type jobNamePrefix: str
"""
opLastJobId = list()
inputDir = os.path.join(setupDir, opList.name, op.name, constants.INPUT_DIR)
cgnsFile = op.cgnsFile or opList.cgnsFile or setup.cgnsFile
jobNameFormat = "_".join([jobNamePrefix, "{0}", ] + [x for x in [setup.name, opList.name, op.name] if not (x == '.' or x == '' or x is None)])
outputFileDir = os.path.join(self.path, constants.LOG_DIR, setup.name, opList.name)
outputFileFormat = "{pointName}_{jobType}.out"
# change back pressures etc. (gmcPlay) for all points where boundary conditions are specified
if not options.step or constants.COMPUTATION_STEP_GMC in options.step:
if op.cgnsSettings or setup.output:
name = jobNameFormat.format("gmcPlay")
gmcPlayJob = Job(createAndExecuteGmcPlayJournalFile, args=[executableDict[constants.GMCPLAY_TEMPLATE].path, setup, op, cgnsFile],
kwargs=dict(journalFileName=name + gmcConst.GMC_JOURNAL_EXTENSION),
jobName=name, executeOnMaster=True, workingDirectory=inputDir, resourcesDict=resourcesDict,
outputFile=os.path.join(outputFileDir,
outputFileFormat.format(jobType="gmcPlay", pointName=op.name)),
weight=jobConsts.DEFAULT_WEIGHT_METHOD, account=options.account,
group="{}:GMCPlay".format(self.name))
testCaseJobList.addJob(gmcPlayJob, parents=opLastJobId, weight=jobConsts.DEFAULT_WEIGHT_METHOD)
opLastJobId = [gmcPlayJob.id]
# add prep jobs
if not options.step or constants.COMPUTATION_STEP_PREP in options.step:
opLastJobId = ValidationTestCaseSub.getPrepCommands(options, executableDict, testCaseJobList, opList, op,
opLastJobId, inputDir, outputFileDir)
# add trace job
if not options.step or constants.COMPUTATION_STEP_TRACE in options.step:
arguments = ValidationTestCaseSub.getArguments(os.path.join(inputDir, cgnsFile), opList, op)
nProcs = op.nProcs or setup.nProcs
threadsPerProc = options.threadsPerProc or op.threadsPerProc or setup.threadsPerProc
# outfilename will be set right automatically
# set weight from wallTime or DEFAULT
weight = convertWallTimeToWeight(op, jobConsts.DEFAULT_WEIGHT_SHELL)
name = jobNameFormat.format("TRACE")
traceJob = Job(executableDict[constants.TRACE_SUITE_TRACE_EXEC].path, args=arguments, nProcs=nProcs, procsPerNode=(options.procsPerNode or op.procsPerNode or setup.procsPerNode),
queue=options.queue, jobName=name,
threadsPerProc=threadsPerProc, jobType="TRACE", workingDirectory=inputDir,
outputFile=os.path.join(outputFileDir, outputFileFormat.format(jobType="TRACE", pointName=op.name)),
nodeType=options.nodeType, wallTime=op.wallTime, weight=weight * nProcs * (threadsPerProc or 1), resourcesDict=resourcesDict,
account=options.account, group="{}:TRACE".format(self.name), slurmSwitches=options.slurmSwitches)
testCaseJobList.addJob(traceJob, parents=opLastJobId)
opLastJobId = [traceJob.id]
# addPostCommands
if not options.step or constants.COMPUTATION_STEP_POST in options.step:
try:
cgnsDir = os.path.join(setupDir, opList.name, op.name, constants.INPUT_DIR, setup.output.cgns.directory)
except (AttributeError, TypeError):
cgnsDir = os.path.join(setupDir, opList.name, op.name, constants.OUTPUT_DIR, constants.DEFAULT_CGNS_DIR)
ValidationTestCaseSub.getPostCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, cgnsDir,
outputFileDir)
def _addSetupToJobList(self, options, executableDict, testCaseJobList, setup, resourcesDict=None, selectedSimDict=None,
jobNamePrefix="OP"):
"""Adds a setup to the job list.
:param options: pavayo options
:param executableDict: dictionary of executables
:param testCaseJobList: job list of a test case
:param setup: current setup
:param resourcesDict: resource dictionary
:param selectedSimDict: dictionary of subset jobs
:param jobNamePrefix: prefix for job names
:type options: Namespace
:type executableDict: ExecutableResources
:type testCaseJobList: JobList
:type setup SetupSub or RestartSetupSub
:type resourcesDict: dict or None
:type selectedSimDict: dict or None
:type jobNamePrefix: str
"""
if selectedSimDict is None or selectedSimDict[setup.name] == "all":
for opList in setup:
setupDir = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name)
for op in opList:
self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op,
resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix)
else:
for opList in setup:
setupDir = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name)
if opList.name in selectedSimDict[setup.name] and selectedSimDict[setup.name][opList.name] == "all":
for op in opList:
self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op,
resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix)
else:
for op in opList:
if op.name in selectedSimDict[setup.name][opList.name]:
self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op,
resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix)
[docs] def getRestartJobList(self, options, executableDict, resourcesDict=None):
"""Returns a job list containing all jobs for the restart step of the test case.
:param options: argparse instance of the current PAVAYO run
:param executableDict: dictionary of executables
:param resourcesDict: resource dictionary
:type options: Argparse
:type executableDict: ExecutableResources
:type resourcesDict: dict
:return: job list representing the computation of this test case or None
:rtype: JobList or None
"""
if not self.restartSetup:
return
selectedSimDict = self.findSubsetInJobList(options.selectiveCompute) if options.selectiveCompute else None
myJobList = JobList(name=self.name + "_UPDATERESTART_JOBLIST", verbosity=options.verbose)
jobName = "Restart-setupJob"
if selectedSimDict is not None:
setupJob = Job(self._setupTestCasePartial, args=[selectedSimDict], kwargs=dict(restart=True, keepLog=False),
jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path,
weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, account=options.account,
group="{}:UpdateRestart".format(self.name))
else:
setupJob = Job(self._setupTestCase, kwargs=dict(restart=True, keepLog=False), jobName=jobName,
workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:UpdateRestart".format(self.name))
myJobList.addJob(setupJob)
lastJobId = [setupJob.id]
testCaseJobList = JobList(name=self.name + "_RESTART_VALIDATION", verbosity=options.verbose)
for setup in self.restartSetup:
self._addSetupToJobList(options, executableDict, testCaseJobList, setup, resourcesDict=resourcesDict,
selectedSimDict=selectedSimDict, jobNamePrefix="Restart_" + self.name)
if testCaseJobList:
myJobList.addJob(testCaseJobList, parents=lastJobId)
lastJobId = [testCaseJobList.id]
jobName = "Restart-CollectJob"
collectJob = Job(ValidationTestCaseSub._collectData, args=[self], jobName=jobName,
workingDirectory=self.path, executeOnMaster=True, outputDir=os.path.join(self.path, constants.LOG_DIR), weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:RestartCollect".format(self.name))
myJobList.addJob(collectJob, parents=lastJobId)
lastJobId = [collectJob.id]
return myJobList
[docs] def getComputationJobList(self, options, executableDict, resourcesDict=None):
"""Returns a job list containing all jobs for the computation step of the test case.
:param options: argparse instance of the current PAVAYO run
:param executableDict: dictionary of executables
:param resourcesDict: resource dictionary
:type options: Argparse
:type executableDict: ExecutableResources
:type resourcesDict: dict
:return: job list representing the computation of this test case or None
:rtype: JobList or None
"""
if not self.setup:
return
selectedSimDict = self.findSubsetInJobList(options.selectiveCompute) if options.selectiveCompute else None
myJobList = JobList(name=self.name + "_COMPUTATION_JOBLIST", verbosity=options.verbose,
retries=options.retriesComputation, deactivateJobGrouping=options.deactivateClusterJobGrouping)
if not options.step or constants.COMPUTATION_STEP_RESET in options.step:
jobName = "setupJob"
if selectedSimDict is not None:
setupJob = Job(self._setupTestCasePartial, args=[selectedSimDict], kwargs={"keepLog": False},
jobName=jobName, workingDirectory=self.path, executeOnMaster=True,
outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
group="{}:Computation".format(self.name))
else:
setupJob = Job(self._setupTestCase, kwargs=dict(keepLog=False),
jobName=jobName, workingDirectory=self.path, executeOnMaster=True,
outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
group="{}:Computation".format(self.name))
myJobList.addJob(setupJob)
lastJobId = [setupJob.id]
else:
lastJobId = None
testCaseJobList = JobList(name=self.name + "_VALIDATION", verbosity=options.verbose)
for setup in self:
self._addSetupToJobList(options, executableDict, testCaseJobList, setup, resourcesDict=resourcesDict,
selectedSimDict=selectedSimDict, jobNamePrefix=self.name)
if testCaseJobList:
myJobList.addJob(testCaseJobList, parents=lastJobId)
lastJobId = [testCaseJobList.id]
return myJobList
[docs] def getPostprocessingJobList(self, options, executableDict, resourcesDict=None):
"""Returns a job containing the post script of this test case.
:param options: Argparse instance of the current PAVAYO run
:param executableDict: dictionary of executables
:param resourcesDict: resource dictionary
:type options: argparse
:type executableDict: ExecutableResources
:type resourcesDict: dict
:return: job list representing the post-processing job list
:rtype: JobList or None
"""
postProcessingJobList = JobList(name=self.name + "_POSTPROCESSING_JOBLIST", verbosity=options.verbose)
logDir = os.path.join(self.path, constants.LOG_DIR, "postProcessing")
if not options.compute:
jobName = "setupJobPostProcessing"
setupJob = Job(self._setupPostProcessing, jobName=jobName, workingDirectory=self.path, executeOnMaster=True,
outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
group="{}:Postprocessing".format(self.name))
postProcessingJobList.addJob(setupJob)
lastJobId = [setupJob.id]
else:
lastJobId = None
if self.analysisGroup:
selectedSimDict = self.findSubsetInJobList(options.selectiveCompute, step=constants.POSTPROCESSING) if options.selectiveCompute else None
analysisJobList = JobList(name=self.name + "_ANALYSIS_JOBLIST", verbosity=options.verbose)
allJobs = self.createJobListDictionary(options, executableDict, "analysisGroup", "analysis", "job", self._getAnalysisWorkingDirForJob,
selectDict=selectedSimDict, resourcesDict=resourcesDict)
self.setDependenciesInJobList(analysisJobList, allJobs)
postProcessingJobList.addJob(analysisJobList, lastJobId)
if self.figures:
lastPostProcId = [analysisJobList.id] if self.analysisGroup else lastJobId
validFigures = [os.path.splitext(fig.name)[0] for fig in self.figures.figure]
for dirPath, _, fileNames in os.walk(os.path.join(self.path, constants.ANALYSIS_DIR)):
for fileName in fileNames:
fname, fext = os.path.splitext(fileName)
if fext[1:] == constants.TECPLOT_LAYOUT_ENDING and fname in validFigures:
job = self._generateTecplotLayoutJob(dirPath, fileName, logDir)
postProcessingJobList.addJob(job, lastPostProcId)
validFigures.remove(fname)
if not validFigures:
break
if not validFigures:
break
for plot in self.figures.plot:
createPlotJob = Job(self.createPlot, args=[plot], jobName=self.name + "_createPlot", outputDir=logDir,
weight=jobConsts.DEFAULT_WEIGHT_METHOD, account=options.account,
group="{}:Postprocessing".format(self.name))
postProcessingJobList.addJob(createPlotJob, lastPostProcId)
return postProcessingJobList
[docs] def createPlot(self, plot, verbose=True):
"""Creates a plot
:param plot: plot instance
:param verbose: verbosity
:type plot: supermod.MatplotlibFigure
:type verbose: bool
"""
if verbose:
print("Create matplotlib plot for ", end='')
if plot.generalPlot or plot.xyPlot:
print("XY plot.")
self._createGeneralPlot(plot, verbose=verbose)
elif plot.testSuitePlot:
raise NotImplementedError("testSuitePlot is not allowed for ValidationTestCase.")
else:
fig = FigureWrapper(title=PyTex.escapeChars(plot.title), paperSize=plot.paperSize, paperOrientation=plot.paperOrientation,
sharey=plot.shareYAxis)
if plot.radialDistributionPlot:
if verbose:
print("radial distribution.")
self.plotRadialDistribution(fig, plot.radialDistributionPlot[0])
plot.bottom = max(0.06, plot.bottom)
plot.top = min(0.95, plot.top)
elif plot.residualReport:
if verbose:
print("residual convergence plot.")
self.plotResidualReport(fig, plot.residualReport[0])
elif plot.convergenceReport:
if verbose:
print("convergence plot.")
self.plotConvergenceReport(fig, plot.convergenceReport[0])
plot.top = min(0.95, plot.top)
elif plot.performanceMap:
if verbose:
print("performance map.")
self.plotPerformanceMap(fig, plot.performanceMap[0])
plot.bottom = max(0.06, plot.bottom)
outputFileName = os.path.join(self.path, constants.FIGURES_DIR, plot.fileName)
ensurePath(outputFileName)
fig.saveFigure(filename=outputFileName, dpi=plot.dpi, left=plot.left, right=plot.right, bottom=plot.bottom, top=plot.top)
if verbose:
print("Done.")
[docs] def plotRadialDistribution(self, fig, radialDistribution):
"""Creates a plot of the radial distribution
:param fig: handle of matplotlib figure
:param radialDistribution: settings for radial distribution
:type fig: FigureWrapper
:type radialDistribution: supermod.RadialDistribution
"""
yVariable = radialDistribution.variables.y
for xVariable in radialDistribution.variables.x:
plotStyle.PlotStyle().reset()
xLabel = plotStyle.getVariableLabel(xVariable.valueOf_)
yLabel = plotStyle.getVariableLabel(yVariable.valueOf_)
if xVariable.valueOf_ != "":
radialPlot = XYPlot(xLabel=xLabel, yLabel=yLabel)
for setting in radialDistribution.data.settings:
dependency = setting.dependency
averageType = setting.averageType or radialDistribution.data.averageType or "flux"
dependencyPathPart = dependency.getPath()
uLstPath = os.path.join(self.path, constants.COMPUTATION_DIR, dependencyPathPart,
constants.OUTPUT_DIR, radialDistribution.data.ulstPath)
uLst = readUList(uLstPath)[0]
for partName in radialDistribution.data.parts.name:
linestyleLabel = partName.label if partName.label is not None else partName.valueOf_
colorLabel = setting.label if setting.label is not None else dependencyPathPart
radialPlot.addDataDictFromUList(uLst, partName.valueOf_, xVariable.valueOf_, yVariable.valueOf_,
linestyleLabel=PyTex.escapeChars(linestyleLabel),
colorLabel=PyTex.escapeChars(colorLabel),
avType=uListConst.AV_TYPE_DICT[averageType])
radialPlot.xLim = (xVariable.min, xVariable.max)
radialPlot.yLim = (yVariable.min, yVariable.max)
else:
radialPlot = None
fig.addPlot(radialPlot)
[docs] def plotResidualReport(self, fig, residualReport):
"""Creates a residual plot
:param fig: handle of matplotlib figure
:param residualReport: settings for residual plot
:type fig: FigureWrapper
:type residualReport: supermod.ResidualReport
"""
xLabel = residualReport.xLabel if residualReport.xLabel else "Residual L1 [-]"
yLabel = residualReport.yLabel if residualReport.yLabel else "Time Step [-]"
convergencePlot = XYPlot(xLabel=xLabel, yLabel=yLabel)
if residualReport.xScale:
convergencePlot.xScale = residualReport.xScale
if residualReport.yScale:
convergencePlot.yScale = residualReport.yScale
if residualReport.xLim:
convergencePlot.xLim = (residualReport.xLim.min, residualReport.xLim.max)
if residualReport.yLim:
convergencePlot.yLim = (residualReport.yLim.min, residualReport.yLim.max)
SettingDict = collections.namedtuple("SettingDict", ("setup", "opLists",))
setupLst = list()
if residualReport.data:
for settings in residualReport.data.settings:
dependency = settings.dependency
if dependency.job:
raise RuntimeError("")
thisSetup = self.find(dependency.level)
thisOpList = thisSetup.find(dependency.subLevel)
thisOpList = [thisOpList] if thisOpList else thisSetup.opList
setupLst.append(SettingDict(setup=thisSetup, opLists=thisOpList,))
else:
residualReport.data = supermod.ResidualReportData()
setupLst = [SettingDict(setup=thisSetup, opLists=thisSetup.opList,) for thisSetup in self.setup]
for i, setupDict in enumerate(setupLst):
setup = setupDict.setup
opLists = setupDict.opLists
if residualReport.data:
setupLabel = residualReport.data.settings[i].label
else:
setupLabel = ""
for opList in opLists:
residualXValues = list()
residualYValues = list()
offset = 0
colorLabel = setupLabel if setupLabel else ", ".join([setup.name, opList.name])
for op in opList:
pathToResidualFile = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name,
constants.OUTPUT_DIR, residualReport.data.path)
resTec = TecplotDataset.fromFile(pathToResidualFile)
residualXValues = np.concatenate((residualXValues, resTec.zones[0]['TimeStep'].getDataAsNumpyArray() + offset))
residualYValues = np.concatenate((residualYValues, resTec.zones[0]['ResidualL1'].getDataAsNumpyArray()))
offset = residualXValues[-1]
convergencePlot.addDataDictPlotStyleLabels(residualXValues, residualYValues,
colorLabel=PyTex.escapeChars(colorLabel))
fig.addPlot(convergencePlot)
[docs] def plotConvergenceReport(self, fig, convergenceReport):
"""Creates a convergence plot
:param fig: handle of matplotlib figure
:param convergenceReport: settings for convergence plot
:type fig: FigureWrapper
:type convergenceReport: supermod.ConvergenceReport
"""
fig.legendStyle = plotData.LEGEND_STYLE_TWO_LEGENDS
_inletName = "Inlet"
_outletName = "Outlet"
xLabel = convergenceReport.xLabel if convergenceReport.xLabel else "Time Step [-]"
yLabel = convergenceReport.yLabel if convergenceReport.yLabel else "Mass Flow [Kg/s]"
convergencePlot = XYPlot(xLabel=xLabel, yLabel=yLabel)
if convergenceReport.xScale:
convergencePlot.xScale = convergenceReport.xScale
if convergenceReport.yScale:
convergencePlot.yScale = convergenceReport.yScale
if convergenceReport.xLim:
convergencePlot.xLim = (convergenceReport.xLim.min, convergenceReport.xLim.max)
if convergenceReport.yLim:
convergencePlot.yLim = (convergenceReport.yLim.min, convergenceReport.yLim.max)
SettingDict = collections.namedtuple("SettingDict", ("setup", "opLists", "averageType"))
setupLst = list()
if convergenceReport.data:
for settings in convergenceReport.data.settings:
dependency = settings.dependency
if dependency.job:
raise RuntimeError("")
thisSetup = self.find(dependency.level)
thisOpList = thisSetup.find(dependency.subLevel)
thisOpList = [thisOpList] if thisOpList else thisSetup.opList
setupLst.append(SettingDict(setup=thisSetup, opLists=thisOpList,
averageType=settings.averageType or convergenceReport.data.averageType or "flux"))
else:
convergenceReport.data = supermod.ConvergenceReportData()
setupLst = [SettingDict(setup=thisSetup, opLists=thisSetup.opList, averageType="flux") for thisSetup in self.setup]
for i, setupDict in enumerate(setupLst):
setup = setupDict.setup
opLists = setupDict.opLists
averageType = setupDict.averageType
if convergenceReport.data:
setupLabel = convergenceReport.data.settings[i].label
else:
setupLabel = ""
uLstPath = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, setup[0].name, setup[0][0].name,
constants.OUTPUT_DIR, convergenceReport.data.ulstPath)
uLst = readUList(uLstPath)[0]
inletName = uLst.fluxface[0].name
outletName = uLst.fluxface[-1].name
for opList in opLists:
inletXValues = list()
inletYValues = list()
outletXValues = list()
outletYValues = list()
offset = 0
colorLabel = setupLabel if setupLabel else ", ".join([setup.name, opList.name])
for op in opList:
inletFileName = f"{convergenceReport.data.pathPrefix}_{averageType}_{inletName}.dat"
pathToResidualInletFile = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name,
constants.OUTPUT_DIR, inletFileName)
inTec = TecplotDataset.fromFile(pathToResidualInletFile)
inletXValues = np.concatenate((inletXValues, inTec.zones[0]['TimeStep'].getDataAsNumpyArray() + offset))
inletYValues = np.concatenate((inletYValues, inTec.zones[0]['MassFlow'].getDataAsNumpyArray()))
outletFileName = f"{convergenceReport.data.pathPrefix}_{averageType}_{outletName}.dat"
pathToResidualOutletFile = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name,
constants.OUTPUT_DIR, outletFileName)
outTec = TecplotDataset.fromFile(pathToResidualOutletFile)
outletXValues = np.concatenate((outletXValues, outTec.zones[0]['TimeStep'].getDataAsNumpyArray() + offset))
outletYValues = np.concatenate((outletYValues, outTec.zones[0]['MassFlow'].getDataAsNumpyArray() * -1))
offset = inletXValues[-1]
convergencePlot.addDataDictPlotStyleLabels(inletXValues, inletYValues, linestyleLabel=PyTex.escapeChars(_inletName),
colorLabel=PyTex.escapeChars(colorLabel))
convergencePlot.addDataDictPlotStyleLabels(outletXValues, outletYValues, linestyleLabel=PyTex.escapeChars(_outletName),
colorLabel=PyTex.escapeChars(colorLabel))
fig.addPlot(convergencePlot)
def __len__(self):
"""Returns the number of operation point lists in the testcase.
:return: number of operation point lists
:rtype: int
"""
return len(self.setup)
def __getitem__(self, index):
"""Returns a operation point list at a given index.
:return: operation point list at given index
:rtype: operation point list instance
"""
return self.setup[index]