Source code for mojo.pavayo.testcase.validationTestCase

"""
ValidationTestCase
====================

The ValidationTestCase is designed for the computation and evaluation of several setup including several operating point lists.
This test case class uses the same folder structure as the TestSuiteTestCase. Related test cases comprise the sources folder and the
analysis folder. For the computation the data is linked or copied from the sources folder to the computation folder.

Each operating point is considered as independent. Thus there are no dependencies between operating points possible.
Before the TRACE computation is started the control file can be modified and several settings in the CGNS file can be set via gmcPlay.
When the computation is done post-processing jobs can be executed.

.. warning::
    For the post-processing jobs the base folder is the CGNS folder. If a CGNS directory is defined in the setup it will be set via
    gmcPlay and it will be used here. Otherwise it is expected that the CGNS file are at the default location ('../output/cgns/).

"""


import collections
import os

import numpy as np

from . import testcase_generated as supermod
from .abstractTestCase import AbstractTestCase

from .. import computeData as constants
from ..computeMethods import convertWallTimeToWeight

from ...gmcplay import gmcPlayData as gmcConst
from ...gmcplay.gmcPlayInValidationTestCase import createAndExecuteGmcPlayJournalFile

from ...bricabrac.fileIO import ensurePath, md, removeFile
from ...jobManagement.jobs.jobList import JobList
from ...jobManagement.jobs.job import Job
from ...jobManagement import jobManagementData as jobConsts

from ...latex import PyTex
from ...plotting.figureWrapper import FigureWrapper
from ...plotting.xyPlot import XYPlot
from ...plotting import plotStyle
from ...plotting import plotData

from ...traceSuiteControl.controlfile.tracecommandlistClass import TraceCommandList
from ...traceSuiteControl.uList.uListReader import readUList
from ...traceSuiteControl.uList import uListData as uListConst

from datfile import TecplotDataset


[docs]class ValidationTestCaseSub(supermod.ValidationTestCase, collections.Sequence, AbstractTestCase): """ Represents a speedline test case. Inherits from collections. Sequence so you can iterate over its operation point lists. All possible parameters to the constructor of this class can be found in the XSD file used to generate testcase_generated.py. """
[docs] def append(self, setup): """Appends a given setup to the test case. :param setup: setup to append to the testcase :type setup: SetupSub """ self.setup.append(setup)
[docs] def find(self, name): """Finds a given setup to the test case. :param name: name of the setup searched for :type name: str :return: setup instance :rtype: SetupSub """ for setup in self.setup: if name == setup.name: return setup return None
def _createControlFile(self, setup, opList, op): """Creates new control file based on an existing control file (if it exists) and adds additional control commands. :param setup: current setup :param opList: current operating point list :param op: current operating point :type setup: SetupSub :type opList: ValidationOpListSub :type op: ValidationOpSub """ pathToJob = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name, constants.INPUT_DIR) controlFilePath = os.path.join(pathToJob, constants.CONTROL_FILE_NAME) listOfCommandsOP = TraceCommandList(filename=controlFilePath) if op.controlFile: controlFileOpPath = os.path.join(pathToJob, op.controlFile) listOfCommandsOP.convertCommandFile(filename=controlFileOpPath) elif opList.controlFile: controlFileOpListPath = os.path.join(pathToJob, opList.controlFile) listOfCommandsOP.convertCommandFile(filename=controlFileOpListPath) for opCommand in op.addCtrlCommand: listOfCommandsOP.addCommandLine(opCommand) for opCommand in opList.addCtrlCommand: listOfCommandsOP.addCommandLine(opCommand) if not listOfCommandsOP.writeCommandFile(): raise RuntimeError(f"Cannot write new TRACE control file '{controlFilePath}'.") def _createDirectories(self, setupName, opListName, opName): """Purges the computation, log and figure folder. :param setupName: name of setup :param opListName: name of OP list :param opName: name of OP :type setupName: str :type opListName: str :type opName: str """ md(os.path.join(self.path, constants.COMPUTATION_DIR, setupName, opListName, opName, constants.INPUT_DIR)) md(os.path.join(self.path, constants.COMPUTATION_DIR, setupName, opListName, opName, constants.OUTPUT_DIR)) def _setupRest(self, setup, opList, op): """Copies files needed by the test cases. :param setup: current setup :param opList: current operating point list :param op: current operating point :type setup: SetupSub :type opList: ValidationOpListSub :type op: ValidationOpSub """ # find all (unique) files in op and its op list to be copied or linked copyFiles = set(opList.sourceFiles.copyFile if opList.sourceFiles else []) | set(op.sourceFiles.copyFile if op.sourceFiles else []) linkFiles = set(opList.sourceFiles.linkFile if opList.sourceFiles else []) | set(op.sourceFiles.linkFile if op.sourceFiles else []) if copyFiles & linkFiles: raise RuntimeError("Try to link and copy file(s) {0} at the same time".format(", ".join(list(copyFiles & linkFiles)))) for copyFile in copyFiles: self._copyFromSource(copyFile, setup.name, opList.name, op.name) for linkFile in linkFiles: self._linkFromSource(linkFile, setup.name, opList.name, op.name) # update control file if necessary if op.addCtrlCommand or opList.addCtrlCommand: self._createControlFile(setup, opList, op) def _setupTestCase(self, restart=False, keepLog=False): """Performs a clean for the test case. Purges the computation, log and figure folder. Copies files needed by the test cases. :param restart: flag whether restart group or simulation group list is used :param keepLog: flag whether log folder is removed :type restart: bool :type keepLog: bool """ removeFile(os.path.join(self.path, constants.COMPUTATION_DIR)) if not keepLog: removeFile(os.path.join(self.path, constants.LOG_DIR)) removeFile(os.path.join(self.path, constants.FIGURES_DIR)) md(os.path.join(self.path, constants.LOG_DIR)) md(os.path.join(self.path, constants.FIGURES_DIR)) if restart: setupList = self.restartSetup else: setupList = self.setup for setup in setupList: for opList in setup.opList: for op in opList: self._createDirectories(setup.name, opList.name, op.name) for setup in setupList: for opList in setup.opList: for op in opList: self._setupRest(setup, opList, op) def _setupTestCasePartial(self, selectiveCompute, restart=False, keepLog=False): """Performs a clean for the test case. Purges the computation, log and figure folder. Copies files needed by the test cases. :param selectiveCompute: list of selected computations :param restart: flag whether restart group or simulation group list is used :param keepLog: flag whether log folder is removed :type selectiveCompute: list of str :type restart: bool :type keepLog: bool """ removeFile(os.path.join(self.path, constants.COMPUTATION_DIR)) if not keepLog: removeFile(os.path.join(self.path, constants.LOG_DIR)) removeFile(os.path.join(self.path, constants.FIGURES_DIR)) md(os.path.join(self.path, constants.LOG_DIR)) md(os.path.join(self.path, constants.FIGURES_DIR)) if restart: setupList = self.restartSetup else: setupList = self.setup for setup in setupList: if selectiveCompute[setup.name] == "all": for opList in setup.opList: for op in opList: self._createDirectories(setup.name, opList.name, op.name) else: for opList in setup.opList: if opList.name in selectiveCompute[setup.name] and selectiveCompute[setup.name][opList.name] == "all": for op in opList: self._createDirectories(setup.name, opList.name, op.name) else: for op in opList: if op.name in selectiveCompute[setup.name][opList.name]: self._createDirectories(setup.name, opList.name, op.name) for setup in setupList: if selectiveCompute[setup.name] == "all": for opList in setup.opList: for op in opList: self._setupRest(setup, opList, op) else: for opList in setup.opList: if opList.name in selectiveCompute[setup.name] and selectiveCompute[setup.name][opList.name] == "all": for op in opList: self._setupRest(setup, opList, op) else: for op in opList: if op.name in selectiveCompute[setup.name][opList.name]: self._setupRest(setup, opList, op) def _setupPostProcessing(self): """Empty figures folder """ removeFile(os.path.join(self.path, constants.FIGURES_DIR)) md(os.path.join(self.path, constants.FIGURES_DIR)) @staticmethod def _collectData(testcase): """Colletcs data from the computation folder in the update restart mode and copies it to the appropriate folders in the source folder. :param testcase: instance of validation test case :type testcase: ValidationTestCaseSub """ for setup in testcase.restartSetup: for opList in setup.opList: for op in opList: testcase._collectDataForSingleSaveFiles(op.saveFiles, setup.name, opList.name, op.name) testcase._collectDataForSingleSaveFiles(opList.saveFiles, setup.name, opList.name) testcase._collectDataForSingleSaveFiles(setup.saveFiles, setup.name)
[docs] @staticmethod def getPrepCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, inputDir, outputFileDir, resourcesDict=None): """Static method to add the prep-processing commands to the job list. :param options: pavayo options :param executableDict: dictionary of executables :param testCaseJobList: job list of a test case :param opList: current operating point list :param op: current operating point :param opLastJobId: index list of the last jobs (used for dependencies) :param inputDir: input directory :param outputFileDir: directory for the log files :param resourcesDict: resource dictionary :type options: Namespace :type executableDict: ExecutableResources :type testCaseJobList: list() :type opList: ValidationOpListSub :type op: ValidationOpSub :type opLastJobId: list() :type inputDir: str :type outputFileDir: str :type resourcesDict: dict :return: list of last jobs (used for dependencies) :rtype: list """ prepCommands = list(opList.prepCommands) if opList.prepCommands else list() prepCommands += list(op.prepCommands) if op.prepCommands else list() if prepCommands: prepJobs = dict() prepJobIds = list() for job in prepCommands: prepJob = job.getJob(options, executableDict, inputDir, outputFileDir, resourcesDict=resourcesDict) prepJob.setOutFile(os.path.join(outputFileDir, "_".join([op.name, os.path.basename(prepJob.getOutFile())]))) prepJobs[job] = prepJob prepJobIds.append(prepJob.id) for job, jobListJob in prepJobs.items(): dependencies = list() if job.dependency: for dependency in job.dependency: if dependency.level or dependency.subLevel: raise RuntimeError("No dependencies for POST jobs allowed outside the current operating point") for parentJob in prepJobs.keys(): if parentJob.name == dependency.job: dependencies.append(prepJobs[parentJob].id) try: prepJobIds.remove(prepJobs[parentJob].id) except ValueError: pass break if len(dependencies) != len(job.dependency): raise AssertionError("Not all dependencies could be resolved. {0} != {1}".format(len(dependencies), len(job.dependency))) else: dependencies = opLastJobId testCaseJobList.addJob(jobListJob, dependencies) opLastJobId = prepJobIds return opLastJobId
[docs] @staticmethod def getArguments(cgnsFilePath, opList, op): """Static method to retrieve the basic settings for the TRACE command line. :param cgnsFilePath: path to the CGNS file :param opList: current operating point list :param op: current operating point :type cgnsFilePath: str :type opList: ValidationOpListSub :type op: ValidationOpSub :return: TRACE command options as a list :rtype: list() """ if op.addCtrlCommand or opList.addCtrlCommand: controlFile = constants.CONTROL_FILE_NAME else: controlFile = op.controlFile or opList.controlFile balanceFile = op.balanceFile or opList.balanceFile arguments = ["-cgns", cgnsFilePath] if controlFile: arguments.append("-cntl") arguments.append(controlFile) if balanceFile: arguments.append("-lb") arguments.append(balanceFile) return arguments
[docs] @staticmethod def getPostCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, cgnsDir, outputFileDir, resourcesDict=None): """Static method to add the post-processing commands to the job list. :param options: pavayo options :param executableDict: dictionary of executables :param testCaseJobList: job list of a test case :param opList: current operating point list :param op: current operating point :param opLastJobId: index list of the last jobs (used for dependencies) :param cgnsDir: start directory :param outputFileDir: directory for the log files :param resourcesDict: resource dictionary :type options: Namespace :type executableDict: ExecutableResources :type testCaseJobList: list :type opList: ValidationOpListSub :type op: ValidationOpSub :type opLastJobId: list :type cgnsDir: str :type outputFileDir: str :type resourcesDict: dict :return: list of last jobs (used for dependencies) :rtype: list """ postCommands = list(opList.postCommands) if opList.postCommands else list() postCommands += list(op.postCommands) if op.postCommands else list() postJobs = dict() for job in postCommands: postJob = job.getJob(options, executableDict, cgnsDir, outputFileDir, resourcesDict=resourcesDict) postJob.setOutFile(os.path.join(outputFileDir, "_".join([op.name, os.path.basename(postJob.getOutFile())]))) postJobs[job] = postJob for job, jobListJob in postJobs.items(): dependencies = list() if job.dependency: for dependency in job.dependency: if dependency.level or dependency.subLevel: raise RuntimeError("No dependencies for POST jobs allowed outside the current operating point") for parentJob in postJobs.keys(): if parentJob.name == dependency.job: dependencies.append(postJobs[parentJob].id) break if len(dependencies) != len(job.dependency): raise AssertionError("Not all dependencies could be resolved. {0} != {1}".format(len(dependencies), len(job.dependency))) else: dependencies = opLastJobId testCaseJobList.addJob(jobListJob, dependencies)
[docs] def findSubsetInJobList(self, nameList, step=constants.COMPUTATION): """find a subset in a job list using a list of keywords :param nameList: list of keywords :param step: working step :type nameList: list of str :type step: str :return: dictionary of all jobs :rtype: collections.defaultdict """ if step.upper() == constants.COMPUTATION: allJobs = super(ValidationTestCaseSub, self).findSubsetInJobList("setup", "opList", "op", nameList) return allJobs elif step.upper() == constants.POSTPROCESSING: return super(ValidationTestCaseSub, self).findSubsetInJobList("analysisGroup", "analysis", "job", nameList) else: raise ValueError(f"Unknown step '{step}' in test case '{self.name}'")
def _addJobsOfOperatingPointToJobList(self, options, executableDict, testCaseJobList, setupDir, setup, opList, op, resourcesDict=None, jobNamePrefix="OP"): """Adds the jobs for a single operating point to the job list. :param options: pavayo options :param executableDict: dictionary of executables :param testCaseJobList: job list of a test case :param setupDir: directory of setup :param setup: current setup :param opList: current operating point list :param op: current operating point :param resourcesDict: resource dictionary :param jobNamePrefix: prefix for job names :type options: Namespace :type executableDict: ExecutableResources :type testCaseJobList: JobList :type setupDir: str :type setup SetupSub or RestartSetupSub :type opList: ValidationOpListSub or RestartValidationOpListSub :type op: ValidationOpSub or RestartValidationOpSub :type resourcesDict: dict :type jobNamePrefix: str """ opLastJobId = list() inputDir = os.path.join(setupDir, opList.name, op.name, constants.INPUT_DIR) cgnsFile = op.cgnsFile or opList.cgnsFile or setup.cgnsFile jobNameFormat = "_".join([jobNamePrefix, "{0}", ] + [x for x in [setup.name, opList.name, op.name] if not (x == '.' or x == '' or x is None)]) outputFileDir = os.path.join(self.path, constants.LOG_DIR, setup.name, opList.name) outputFileFormat = "{pointName}_{jobType}.out" # change back pressures etc. (gmcPlay) for all points where boundary conditions are specified if not options.step or constants.COMPUTATION_STEP_GMC in options.step: if op.cgnsSettings or setup.output: name = jobNameFormat.format("gmcPlay") gmcPlayJob = Job(createAndExecuteGmcPlayJournalFile, args=[executableDict[constants.GMCPLAY_TEMPLATE].path, setup, op, cgnsFile], kwargs=dict(journalFileName=name + gmcConst.GMC_JOURNAL_EXTENSION), jobName=name, executeOnMaster=True, workingDirectory=inputDir, resourcesDict=resourcesDict, outputFile=os.path.join(outputFileDir, outputFileFormat.format(jobType="gmcPlay", pointName=op.name)), weight=jobConsts.DEFAULT_WEIGHT_METHOD, account=options.account, group="{}:GMCPlay".format(self.name)) testCaseJobList.addJob(gmcPlayJob, parents=opLastJobId, weight=jobConsts.DEFAULT_WEIGHT_METHOD) opLastJobId = [gmcPlayJob.id] # add prep jobs if not options.step or constants.COMPUTATION_STEP_PREP in options.step: opLastJobId = ValidationTestCaseSub.getPrepCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, inputDir, outputFileDir) # add trace job if not options.step or constants.COMPUTATION_STEP_TRACE in options.step: arguments = ValidationTestCaseSub.getArguments(os.path.join(inputDir, cgnsFile), opList, op) nProcs = op.nProcs or setup.nProcs threadsPerProc = options.threadsPerProc or op.threadsPerProc or setup.threadsPerProc # outfilename will be set right automatically # set weight from wallTime or DEFAULT weight = convertWallTimeToWeight(op, jobConsts.DEFAULT_WEIGHT_SHELL) name = jobNameFormat.format("TRACE") traceJob = Job(executableDict[constants.TRACE_SUITE_TRACE_EXEC].path, args=arguments, nProcs=nProcs, procsPerNode=(options.procsPerNode or op.procsPerNode or setup.procsPerNode), queue=options.queue, jobName=name, threadsPerProc=threadsPerProc, jobType="TRACE", workingDirectory=inputDir, outputFile=os.path.join(outputFileDir, outputFileFormat.format(jobType="TRACE", pointName=op.name)), nodeType=options.nodeType, wallTime=op.wallTime, weight=weight * nProcs * (threadsPerProc or 1), resourcesDict=resourcesDict, account=options.account, group="{}:TRACE".format(self.name), slurmSwitches=options.slurmSwitches) testCaseJobList.addJob(traceJob, parents=opLastJobId) opLastJobId = [traceJob.id] # addPostCommands if not options.step or constants.COMPUTATION_STEP_POST in options.step: try: cgnsDir = os.path.join(setupDir, opList.name, op.name, constants.INPUT_DIR, setup.output.cgns.directory) except (AttributeError, TypeError): cgnsDir = os.path.join(setupDir, opList.name, op.name, constants.OUTPUT_DIR, constants.DEFAULT_CGNS_DIR) ValidationTestCaseSub.getPostCommands(options, executableDict, testCaseJobList, opList, op, opLastJobId, cgnsDir, outputFileDir) def _addSetupToJobList(self, options, executableDict, testCaseJobList, setup, resourcesDict=None, selectedSimDict=None, jobNamePrefix="OP"): """Adds a setup to the job list. :param options: pavayo options :param executableDict: dictionary of executables :param testCaseJobList: job list of a test case :param setup: current setup :param resourcesDict: resource dictionary :param selectedSimDict: dictionary of subset jobs :param jobNamePrefix: prefix for job names :type options: Namespace :type executableDict: ExecutableResources :type testCaseJobList: JobList :type setup SetupSub or RestartSetupSub :type resourcesDict: dict or None :type selectedSimDict: dict or None :type jobNamePrefix: str """ if selectedSimDict is None or selectedSimDict[setup.name] == "all": for opList in setup: setupDir = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name) for op in opList: self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op, resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix) else: for opList in setup: setupDir = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name) if opList.name in selectedSimDict[setup.name] and selectedSimDict[setup.name][opList.name] == "all": for op in opList: self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op, resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix) else: for op in opList: if op.name in selectedSimDict[setup.name][opList.name]: self._addJobsOfOperatingPointToJobList(options, executableDict, testCaseJobList, setupDir, setup, opList, op, resourcesDict=resourcesDict, jobNamePrefix=jobNamePrefix)
[docs] def getRestartJobList(self, options, executableDict, resourcesDict=None): """Returns a job list containing all jobs for the restart step of the test case. :param options: argparse instance of the current PAVAYO run :param executableDict: dictionary of executables :param resourcesDict: resource dictionary :type options: Argparse :type executableDict: ExecutableResources :type resourcesDict: dict :return: job list representing the computation of this test case or None :rtype: JobList or None """ if not self.restartSetup: return selectedSimDict = self.findSubsetInJobList(options.selectiveCompute) if options.selectiveCompute else None myJobList = JobList(name=self.name + "_UPDATERESTART_JOBLIST", verbosity=options.verbose) jobName = "Restart-setupJob" if selectedSimDict is not None: setupJob = Job(self._setupTestCasePartial, args=[selectedSimDict], kwargs=dict(restart=True, keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, account=options.account, group="{}:UpdateRestart".format(self.name)) else: setupJob = Job(self._setupTestCase, kwargs=dict(restart=True, keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:UpdateRestart".format(self.name)) myJobList.addJob(setupJob) lastJobId = [setupJob.id] testCaseJobList = JobList(name=self.name + "_RESTART_VALIDATION", verbosity=options.verbose) for setup in self.restartSetup: self._addSetupToJobList(options, executableDict, testCaseJobList, setup, resourcesDict=resourcesDict, selectedSimDict=selectedSimDict, jobNamePrefix="Restart_" + self.name) if testCaseJobList: myJobList.addJob(testCaseJobList, parents=lastJobId) lastJobId = [testCaseJobList.id] jobName = "Restart-CollectJob" collectJob = Job(ValidationTestCaseSub._collectData, args=[self], jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=os.path.join(self.path, constants.LOG_DIR), weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:RestartCollect".format(self.name)) myJobList.addJob(collectJob, parents=lastJobId) lastJobId = [collectJob.id] return myJobList
[docs] def getComputationJobList(self, options, executableDict, resourcesDict=None): """Returns a job list containing all jobs for the computation step of the test case. :param options: argparse instance of the current PAVAYO run :param executableDict: dictionary of executables :param resourcesDict: resource dictionary :type options: Argparse :type executableDict: ExecutableResources :type resourcesDict: dict :return: job list representing the computation of this test case or None :rtype: JobList or None """ if not self.setup: return selectedSimDict = self.findSubsetInJobList(options.selectiveCompute) if options.selectiveCompute else None myJobList = JobList(name=self.name + "_COMPUTATION_JOBLIST", verbosity=options.verbose, retries=options.retriesComputation, deactivateJobGrouping=options.deactivateClusterJobGrouping) if not options.step or constants.COMPUTATION_STEP_RESET in options.step: jobName = "setupJob" if selectedSimDict is not None: setupJob = Job(self._setupTestCasePartial, args=[selectedSimDict], kwargs={"keepLog": False}, jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:Computation".format(self.name)) else: setupJob = Job(self._setupTestCase, kwargs=dict(keepLog=False), jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:Computation".format(self.name)) myJobList.addJob(setupJob) lastJobId = [setupJob.id] else: lastJobId = None testCaseJobList = JobList(name=self.name + "_VALIDATION", verbosity=options.verbose) for setup in self: self._addSetupToJobList(options, executableDict, testCaseJobList, setup, resourcesDict=resourcesDict, selectedSimDict=selectedSimDict, jobNamePrefix=self.name) if testCaseJobList: myJobList.addJob(testCaseJobList, parents=lastJobId) lastJobId = [testCaseJobList.id] return myJobList
[docs] def getPostprocessingJobList(self, options, executableDict, resourcesDict=None): """Returns a job containing the post script of this test case. :param options: Argparse instance of the current PAVAYO run :param executableDict: dictionary of executables :param resourcesDict: resource dictionary :type options: argparse :type executableDict: ExecutableResources :type resourcesDict: dict :return: job list representing the post-processing job list :rtype: JobList or None """ postProcessingJobList = JobList(name=self.name + "_POSTPROCESSING_JOBLIST", verbosity=options.verbose) logDir = os.path.join(self.path, constants.LOG_DIR, "postProcessing") if not options.compute: jobName = "setupJobPostProcessing" setupJob = Job(self._setupPostProcessing, jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:Postprocessing".format(self.name)) postProcessingJobList.addJob(setupJob) lastJobId = [setupJob.id] else: lastJobId = None if self.analysisGroup: selectedSimDict = self.findSubsetInJobList(options.selectiveCompute, step=constants.POSTPROCESSING) if options.selectiveCompute else None analysisJobList = JobList(name=self.name + "_ANALYSIS_JOBLIST", verbosity=options.verbose) allJobs = self.createJobListDictionary(options, executableDict, "analysisGroup", "analysis", "job", self._getAnalysisWorkingDirForJob, selectDict=selectedSimDict, resourcesDict=resourcesDict) self.setDependenciesInJobList(analysisJobList, allJobs) postProcessingJobList.addJob(analysisJobList, lastJobId) if self.figures: lastPostProcId = [analysisJobList.id] if self.analysisGroup else lastJobId validFigures = [os.path.splitext(fig.name)[0] for fig in self.figures.figure] for dirPath, _, fileNames in os.walk(os.path.join(self.path, constants.ANALYSIS_DIR)): for fileName in fileNames: fname, fext = os.path.splitext(fileName) if fext[1:] == constants.TECPLOT_LAYOUT_ENDING and fname in validFigures: job = self._generateTecplotLayoutJob(dirPath, fileName, logDir) postProcessingJobList.addJob(job, lastPostProcId) validFigures.remove(fname) if not validFigures: break if not validFigures: break for plot in self.figures.plot: createPlotJob = Job(self.createPlot, args=[plot], jobName=self.name + "_createPlot", outputDir=logDir, weight=jobConsts.DEFAULT_WEIGHT_METHOD, account=options.account, group="{}:Postprocessing".format(self.name)) postProcessingJobList.addJob(createPlotJob, lastPostProcId) return postProcessingJobList
[docs] def createPlot(self, plot, verbose=True): """Creates a plot :param plot: plot instance :param verbose: verbosity :type plot: supermod.MatplotlibFigure :type verbose: bool """ if verbose: print("Create matplotlib plot for ", end='') if plot.generalPlot or plot.xyPlot: print("XY plot.") self._createGeneralPlot(plot, verbose=verbose) elif plot.testSuitePlot: raise NotImplementedError("testSuitePlot is not allowed for ValidationTestCase.") else: fig = FigureWrapper(title=PyTex.escapeChars(plot.title), paperSize=plot.paperSize, paperOrientation=plot.paperOrientation, sharey=plot.shareYAxis) if plot.radialDistributionPlot: if verbose: print("radial distribution.") self.plotRadialDistribution(fig, plot.radialDistributionPlot[0]) plot.bottom = max(0.06, plot.bottom) plot.top = min(0.95, plot.top) elif plot.residualReport: if verbose: print("residual convergence plot.") self.plotResidualReport(fig, plot.residualReport[0]) elif plot.convergenceReport: if verbose: print("convergence plot.") self.plotConvergenceReport(fig, plot.convergenceReport[0]) plot.top = min(0.95, plot.top) elif plot.performanceMap: if verbose: print("performance map.") self.plotPerformanceMap(fig, plot.performanceMap[0]) plot.bottom = max(0.06, plot.bottom) outputFileName = os.path.join(self.path, constants.FIGURES_DIR, plot.fileName) ensurePath(outputFileName) fig.saveFigure(filename=outputFileName, dpi=plot.dpi, left=plot.left, right=plot.right, bottom=plot.bottom, top=plot.top) if verbose: print("Done.")
[docs] def plotRadialDistribution(self, fig, radialDistribution): """Creates a plot of the radial distribution :param fig: handle of matplotlib figure :param radialDistribution: settings for radial distribution :type fig: FigureWrapper :type radialDistribution: supermod.RadialDistribution """ yVariable = radialDistribution.variables.y for xVariable in radialDistribution.variables.x: plotStyle.PlotStyle().reset() xLabel = plotStyle.getVariableLabel(xVariable.valueOf_) yLabel = plotStyle.getVariableLabel(yVariable.valueOf_) if xVariable.valueOf_ != "": radialPlot = XYPlot(xLabel=xLabel, yLabel=yLabel) for setting in radialDistribution.data.settings: dependency = setting.dependency averageType = setting.averageType or radialDistribution.data.averageType or "flux" dependencyPathPart = dependency.getPath() uLstPath = os.path.join(self.path, constants.COMPUTATION_DIR, dependencyPathPart, constants.OUTPUT_DIR, radialDistribution.data.ulstPath) uLst = readUList(uLstPath)[0] for partName in radialDistribution.data.parts.name: linestyleLabel = partName.label if partName.label is not None else partName.valueOf_ colorLabel = setting.label if setting.label is not None else dependencyPathPart radialPlot.addDataDictFromUList(uLst, partName.valueOf_, xVariable.valueOf_, yVariable.valueOf_, linestyleLabel=PyTex.escapeChars(linestyleLabel), colorLabel=PyTex.escapeChars(colorLabel), avType=uListConst.AV_TYPE_DICT[averageType]) radialPlot.xLim = (xVariable.min, xVariable.max) radialPlot.yLim = (yVariable.min, yVariable.max) else: radialPlot = None fig.addPlot(radialPlot)
[docs] def plotResidualReport(self, fig, residualReport): """Creates a residual plot :param fig: handle of matplotlib figure :param residualReport: settings for residual plot :type fig: FigureWrapper :type residualReport: supermod.ResidualReport """ xLabel = residualReport.xLabel if residualReport.xLabel else "Residual L1 [-]" yLabel = residualReport.yLabel if residualReport.yLabel else "Time Step [-]" convergencePlot = XYPlot(xLabel=xLabel, yLabel=yLabel) if residualReport.xScale: convergencePlot.xScale = residualReport.xScale if residualReport.yScale: convergencePlot.yScale = residualReport.yScale if residualReport.xLim: convergencePlot.xLim = (residualReport.xLim.min, residualReport.xLim.max) if residualReport.yLim: convergencePlot.yLim = (residualReport.yLim.min, residualReport.yLim.max) SettingDict = collections.namedtuple("SettingDict", ("setup", "opLists",)) setupLst = list() if residualReport.data: for settings in residualReport.data.settings: dependency = settings.dependency if dependency.job: raise RuntimeError("") thisSetup = self.find(dependency.level) thisOpList = thisSetup.find(dependency.subLevel) thisOpList = [thisOpList] if thisOpList else thisSetup.opList setupLst.append(SettingDict(setup=thisSetup, opLists=thisOpList,)) else: residualReport.data = supermod.ResidualReportData() setupLst = [SettingDict(setup=thisSetup, opLists=thisSetup.opList,) for thisSetup in self.setup] for i, setupDict in enumerate(setupLst): setup = setupDict.setup opLists = setupDict.opLists if residualReport.data: setupLabel = residualReport.data.settings[i].label else: setupLabel = "" for opList in opLists: residualXValues = list() residualYValues = list() offset = 0 colorLabel = setupLabel if setupLabel else ", ".join([setup.name, opList.name]) for op in opList: pathToResidualFile = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name, constants.OUTPUT_DIR, residualReport.data.path) resTec = TecplotDataset.fromFile(pathToResidualFile) residualXValues = np.concatenate((residualXValues, resTec.zones[0]['TimeStep'].getDataAsNumpyArray() + offset)) residualYValues = np.concatenate((residualYValues, resTec.zones[0]['ResidualL1'].getDataAsNumpyArray())) offset = residualXValues[-1] convergencePlot.addDataDictPlotStyleLabels(residualXValues, residualYValues, colorLabel=PyTex.escapeChars(colorLabel)) fig.addPlot(convergencePlot)
[docs] def plotConvergenceReport(self, fig, convergenceReport): """Creates a convergence plot :param fig: handle of matplotlib figure :param convergenceReport: settings for convergence plot :type fig: FigureWrapper :type convergenceReport: supermod.ConvergenceReport """ fig.legendStyle = plotData.LEGEND_STYLE_TWO_LEGENDS _inletName = "Inlet" _outletName = "Outlet" xLabel = convergenceReport.xLabel if convergenceReport.xLabel else "Time Step [-]" yLabel = convergenceReport.yLabel if convergenceReport.yLabel else "Mass Flow [Kg/s]" convergencePlot = XYPlot(xLabel=xLabel, yLabel=yLabel) if convergenceReport.xScale: convergencePlot.xScale = convergenceReport.xScale if convergenceReport.yScale: convergencePlot.yScale = convergenceReport.yScale if convergenceReport.xLim: convergencePlot.xLim = (convergenceReport.xLim.min, convergenceReport.xLim.max) if convergenceReport.yLim: convergencePlot.yLim = (convergenceReport.yLim.min, convergenceReport.yLim.max) SettingDict = collections.namedtuple("SettingDict", ("setup", "opLists", "averageType")) setupLst = list() if convergenceReport.data: for settings in convergenceReport.data.settings: dependency = settings.dependency if dependency.job: raise RuntimeError("") thisSetup = self.find(dependency.level) thisOpList = thisSetup.find(dependency.subLevel) thisOpList = [thisOpList] if thisOpList else thisSetup.opList setupLst.append(SettingDict(setup=thisSetup, opLists=thisOpList, averageType=settings.averageType or convergenceReport.data.averageType or "flux")) else: convergenceReport.data = supermod.ConvergenceReportData() setupLst = [SettingDict(setup=thisSetup, opLists=thisSetup.opList, averageType="flux") for thisSetup in self.setup] for i, setupDict in enumerate(setupLst): setup = setupDict.setup opLists = setupDict.opLists averageType = setupDict.averageType if convergenceReport.data: setupLabel = convergenceReport.data.settings[i].label else: setupLabel = "" uLstPath = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, setup[0].name, setup[0][0].name, constants.OUTPUT_DIR, convergenceReport.data.ulstPath) uLst = readUList(uLstPath)[0] inletName = uLst.fluxface[0].name outletName = uLst.fluxface[-1].name for opList in opLists: inletXValues = list() inletYValues = list() outletXValues = list() outletYValues = list() offset = 0 colorLabel = setupLabel if setupLabel else ", ".join([setup.name, opList.name]) for op in opList: inletFileName = f"{convergenceReport.data.pathPrefix}_{averageType}_{inletName}.dat" pathToResidualInletFile = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name, constants.OUTPUT_DIR, inletFileName) inTec = TecplotDataset.fromFile(pathToResidualInletFile) inletXValues = np.concatenate((inletXValues, inTec.zones[0]['TimeStep'].getDataAsNumpyArray() + offset)) inletYValues = np.concatenate((inletYValues, inTec.zones[0]['MassFlow'].getDataAsNumpyArray())) outletFileName = f"{convergenceReport.data.pathPrefix}_{averageType}_{outletName}.dat" pathToResidualOutletFile = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name, constants.OUTPUT_DIR, outletFileName) outTec = TecplotDataset.fromFile(pathToResidualOutletFile) outletXValues = np.concatenate((outletXValues, outTec.zones[0]['TimeStep'].getDataAsNumpyArray() + offset)) outletYValues = np.concatenate((outletYValues, outTec.zones[0]['MassFlow'].getDataAsNumpyArray() * -1)) offset = inletXValues[-1] convergencePlot.addDataDictPlotStyleLabels(inletXValues, inletYValues, linestyleLabel=PyTex.escapeChars(_inletName), colorLabel=PyTex.escapeChars(colorLabel)) convergencePlot.addDataDictPlotStyleLabels(outletXValues, outletYValues, linestyleLabel=PyTex.escapeChars(_outletName), colorLabel=PyTex.escapeChars(colorLabel)) fig.addPlot(convergencePlot)
[docs] def plotPerformanceMap(self, fig, performanceMap): """Creates a performance map :param fig: handle of matplotlib figure :param performanceMap: settings for performance map :type fig: FigureWrapper :type performanceMap: supermod.PerformanceMap """ plotDict = dict.fromkeys(performanceMap.variables.y) for yVariable in plotDict.keys(): xLabel = plotStyle.getVariableLabel(performanceMap.variables.x) yLabel = plotStyle.getVariableLabel(yVariable) plotDict[yVariable] = XYPlot(xLabel=xLabel, yLabel=yLabel) SettingDict = collections.namedtuple("SettingDict", ("setup", "opLists", "averageType")) setupLst = list() if performanceMap.data and performanceMap.data.settings: for settings in performanceMap.data.settings: dependency = settings.dependency if dependency.job: raise RuntimeError("") thisSetup = self.find(dependency.level) thisOpList = thisSetup.find(dependency.subLevel) thisOpList = [thisOpList] if thisOpList else thisSetup.opList setupLst.append(SettingDict(setup=thisSetup, opLists=thisOpList, averageType=settings.averageType or performanceMap.data.averageType or "flux")) else: performanceMap.data = supermod.PerformanceMapData() setupLst = [SettingDict(setup=thisSetup, opLists=thisSetup.opList, averageType="flux") for thisSetup in self.setup] for i, setupDict in enumerate(setupLst): setup = setupDict.setup opLists = setupDict.opLists averageType = setupDict.averageType if performanceMap.data: try: setupLabel = performanceMap.data.settings[i].label except IndexError: setupLabel = "" else: setupLabel = "" for opList in opLists: yValuesDict = dict.fromkeys(performanceMap.variables.y) xValues = list() for yVariable in yValuesDict.keys(): yValuesDict[yVariable] = list() colorLabel = setupLabel if setupLabel else ", ".join([setup.name, opList.name]) for op in opList: uLstPath = os.path.join(self.path, constants.COMPUTATION_DIR, setup.name, opList.name, op.name, constants.OUTPUT_DIR, performanceMap.data.ulstPath) uLst = readUList(uLstPath)[0] for yVariable in yValuesDict.keys(): yValuesDict[yVariable].append(uLst.getValues(yVariable, avType=uListConst.AV_TYPE_DICT[averageType], dbType="integral", partType="stage")) xValues.append(uLst.getValues(performanceMap.variables.x, avType=uListConst.AV_TYPE_DICT[averageType], dbType="integral", partType="fluxface")) for yVariable in yValuesDict.keys(): plotDict[yVariable].addDataDictPlotStyleLabels(xValues, yValuesDict[yVariable], colorLabel=PyTex.escapeChars(colorLabel), marker=True) for yVariable in performanceMap.variables.y: fig.addPlot(plotDict[yVariable])
def __len__(self): """Returns the number of operation point lists in the testcase. :return: number of operation point lists :rtype: int """ return len(self.setup) def __getitem__(self, index): """Returns a operation point list at a given index. :return: operation point list at given index :rtype: operation point list instance """ return self.setup[index]