"""
testSuiteTestCase.py
====================
testSuiteTestCase offers the TestSuiteTestCaseSub class. The class describes a test case in the test suite.
A TestSuiteTestCase consists of simulation groups, which consist of simulations.
Simulations can consist of several jobs.
Every simulation is executed in a unique folder.
The class implements the interface demanded by AbstractTestCase.
For the generation of the job lists the respective methods of the congregated classes are used.
`Tutorial 1 <../../../../tutorials/notebooks/newTestCaseLevel3.html>`_
"""
import argparse
import collections
import dataclasses
import fnmatch
import glob
import itertools
import json
import os
import textwrap
import typing
from lxml import etree
import pythoncgio
from datfile import TecplotDataset
from ...bricabrac import dataclass_utils
from ..executables.executableResources import ExecutableResources
from . import testcase_generated as supermod
from .abstractTestCase import AbstractTestCase, TestCaseState
from ...latex import pyTex
from ...referenceCheck import ReferenceCheck
from ...referenceCheck.referenceCheckContainer import CheckResult, CheckTolerances, CheckResultVariable, CheckResultCase, CheckResultFile, CheckResultData
from ...referenceCheck.referenceCheckImage import ReferenceCheckImage
from ...referenceCheck.referenceCheckGeneric import ReferenceCheckGeneric
from ...referenceCheck.referenceCheckMain import ReferenceCheckMain
from ...referenceCheck.referenceCheckTree import ReferenceCheckTree
from ...referenceCheck.presenter import ReferenceCheckPresenter
from ...jobManagement.jobs.jobList import JobList
from ...jobManagement.jobs.job import Job
from ...jobManagement.management.clusterInfo import ClusterInfo
from ...jobManagement import jobManagementData as jobConsts
from ...bricabrac.fileIO import Printer
from ...bricabrac.fileIO import globCopy2, md
from ...bricabrac.fileIO import removeFile
from ...bricabrac.stringFormatting import sci_notation
from ...pavayo import computeData as constants
from ...plotting.testSuitePlot import TestSuiteXYPlot
from ...latex.pyTex import PyTex
from ...cgns import cgnsFile
[docs]class TestSuiteTestCaseSub(supermod.TestSuiteTestCase, collections.Sequence, AbstractTestCase):
"""Class to represent a test suite test case. You can iterate over the job lists of a instance.
All possible parameters to the constructor of this class can be found in the XSD file used to generate
testcase_generated.py.
"""
COMPUTATION_DIR = constants.COMPUTATION_DIR
SOURCE_DIR = constants.SOURCE_DIR
FIGURES_DIR = constants.FIGURES_DIR
LOG_DIR = constants.LOG_DIR
ANALYSIS_DIR = constants.ANALYSIS_DIR
REFERENCE_DIR = constants.REFERENCE_DIR
REFERENCE_CHECK_DIR = constants.REFERENCE_CHECK_DIR
REFERENCE_CHECK_SUMMARY_FILE = "checkSummary.json"
INPUT_DIR = constants.INPUT_DIR
OUTPUT_DIR = constants.OUTPUT_DIR
DEFAULT_FLOW_SOLUTION = "FlowSolution"
[docs] @staticmethod
def prepareSourceFiles(testcase: 'TestSuiteTestCaseSub', simulation: supermod.Simulation, simGroupName: str, computationDir: str):
"""Purges the computation, log and figure folder.
Copies files needed by the test cases.
:param testcase: TestSuiteTestCaseSub instance
:param simulation: simulation
:param simGroupName: name of the simulation group
:param computationDir: name of the computation directory
"""
md(os.path.join(testcase.path, computationDir, simGroupName, simulation.name, testcase.INPUT_DIR))
md(os.path.join(testcase.path, computationDir, simGroupName, simulation.name, testcase.OUTPUT_DIR))
if simulation.sourceFiles:
for copyFile in simulation.sourceFiles.copyFile:
TestSuiteTestCaseSub._copyFromSource(testcase, copyFile, simGroupName, simulation.name)
for linkFile in simulation.sourceFiles.linkFile:
TestSuiteTestCaseSub._linkFromSource(testcase, linkFile, simGroupName, simulation.name)
@classmethod
def _setupTestCase(cls, testcase: 'TestSuiteTestCaseSub', simGroupList: typing.Iterable[supermod.SimulationGroup], keepLog: bool = False):
"""Performs a clean for the test case.
Purges the computation, log and figure folder.
Copies files needed by the test cases.
:param testcase: TestSuiteTestCaseSub instance
:param simGroupList: the simulation group to be used
:param keepLog: flag whether log folder is removed
"""
removeFile(os.path.join(testcase.path, cls.COMPUTATION_DIR))
if not keepLog:
removeFile(os.path.join(testcase.path, cls.LOG_DIR))
removeFile(os.path.join(testcase.path, cls.FIGURES_DIR))
removeFile(os.path.join(testcase.path, cls.REFERENCE_CHECK_DIR))
md(os.path.join(testcase.path, cls.LOG_DIR))
md(os.path.join(testcase.path, cls.FIGURES_DIR))
md(os.path.join(testcase.path, cls.REFERENCE_CHECK_DIR))
for simGroup in simGroupList:
for simulation in simGroup.simulation:
TestSuiteTestCaseSub.prepareSourceFiles(testcase, simulation, simGroup.name, cls.COMPUTATION_DIR)
@classmethod
def _setupTestCasePartial(cls, testcase: 'TestSuiteTestCaseSub', simGroupList: typing.Iterable[supermod.SimulationGroup],
selectiveCompute: AbstractTestCase.Subset):
"""Performs a clean for the test case.
Purges the computation, log and figure folder.
Copies files needed by the test cases.
:param testcase: TestSuiteTestCaseSub instance
:param simGroupList: the simulation group to be used
:param selectiveCompute: list of selected computations
"""
for simGroup in simGroupList:
if simGroup.name in selectiveCompute:
if selectiveCompute[simGroup.name] == "all":
removeFile(os.path.join(testcase.path, cls.COMPUTATION_DIR, simGroup.name))
for simulation in simGroup.simulation:
TestSuiteTestCaseSub.prepareSourceFiles(testcase, simulation, simGroup.name, cls.COMPUTATION_DIR)
else:
for simulation in simGroup.simulation:
if simulation.name in selectiveCompute[simGroup.name] and selectiveCompute[simGroup.name][simulation.name] == "all":
removeFile(os.path.join(testcase.path, cls.COMPUTATION_DIR, simGroup.name, simulation.name))
TestSuiteTestCaseSub.prepareSourceFiles(testcase, simulation, simGroup.name, cls.COMPUTATION_DIR)
@classmethod
def _setupPostProcessing(cls, testcase: 'TestSuiteTestCaseSub'):
"""Delete tecplot.phy files to prevent messing up the images in the test report.
:param testcase: TestSuiteTestCaseSub instance
"""
for dirPath, _, fileNames in os.walk(os.path.join(testcase.path, cls.ANALYSIS_DIR)):
for fileName in fileNames:
if fileName == cls.TECPLOT_PHY_FILE_NAME:
removeFile(os.path.join(dirPath, fileName))
@staticmethod
def _setupCheckReferences(testcase: 'TestSuiteTestCaseSub', directory: typing.Optional[str] = None):
"""Performs a clean for the checks directory
:param testcase: TestSuiteTestCaseSub instance
:param directory: directory-suffix that should be prepared for reference checks
"""
directory = directory or constants.REFERENCE_CHECK_DIR
removeFile(os.path.join(testcase.path, directory))
md(os.path.join(testcase.path, directory))
@staticmethod
def _collectData(testcase: 'TestSuiteTestCaseSub', simGroupList: typing.Iterable[supermod.SimulationGroup]):
"""Collects data from computation folder in update restart mode and copies it to the appropriate
simulation folder in the source folder.
:param testcase: TestSuiteTestCaseSub instance
:param simGroupList: the simulation group to be used
"""
for simGroup in simGroupList:
for simulation in simGroup.simulation:
TestSuiteTestCaseSub._collectDataForSingleSaveFiles(testcase, simulation.saveFiles, simGroup.name, simulation.name)
def _adaptNProcs(self, nProcs: int) -> int:
"""Returns the number of processes to use for a job of this test case if there is no number of procs specified explicitly.
The method tests if it is executed on a cluster. Then it returns either the number of procs to use on a cluster or on a workstation.
:param nProcs: number of processes explicitly specified for the job; '-1' means 'not specified'
:return: number of procs to use for a job
"""
nProcsToUse = nProcs
if nProcs == -1:
if ClusterInfo.onValidCluster():
nProcsToUse = self.numProcsCluster
else:
nProcsToUse = self.numProcsLocal
return nProcsToUse
def _getSpecificJobList(self, options: argparse.Namespace,
executableDict: ExecutableResources, resourcesDict: typing.Dict[str, int], step: str) -> JobList:
"""Collects all job objects from the jobs in the current test case instance and creates a job list of them for a given step.
:param options: options instance to extract the executables and the queue to use
:param executableDict: dictionary of executables
:param step: working step
:return: job list representing this test case or None
"""
assert step in (constants.MESHING, constants.RESTART, constants.COMPUTATION), "Invalid step!"
job_list_suffix = {constants.MESHING: "_UPDATEMESH", constants.RESTART: "_UPDATERESTART", constants.COMPUTATION: "_COMPUTATION"}[step]\
+ "_JOBLIST"
job_name = {constants.MESHING: "Mesh-setupJob", constants.RESTART: "Restart-setupJob", constants.COMPUTATION: "setupJob"}[step]
setup_job_args = {"simGroupList": {constants.MESHING: self.meshingGroup, constants.RESTART: self.restartGroup,
constants.COMPUTATION: self.simulationGroup}[step]}
collect_data = step != constants.COMPUTATION
selected_simulations = None
if options.selectiveCompute:
job_list_group_tag = {constants.COMPUTATION: "simulation", constants.RESTART: "restart", constants.POSTPROCESSING: "analysis"}[step]
job_list_tag = "analysis" if step == constants.POSTPROCESSING else "simulation"
selected_simulations = self.findSubsetInJobList(job_list_group_tag + "Group", job_list_tag, "job", options.selectiveCompute, step)
setup_job_args["selectiveCompute"] = selected_simulations
specific_job_list = JobList(name=self.name + job_list_suffix, verbosity=options.verbose,
retries=0 if step != constants.COMPUTATION else options.retriesComputation,
deactivateJobGrouping=options.deactivateClusterJobGrouping)
setup_job = Job(TestSuiteTestCaseSub._setupTestCase if selected_simulations is None else TestSuiteTestCaseSub._setupTestCasePartial,
args=[self], kwargs=setup_job_args, jobName=job_name, workingDirectory=self.path, executeOnMaster=True,
outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:Init")
specific_job_list.addJob(setup_job)
if options.step is None or options.step != [constants.COMPUTATION_STEP_RESET]:
all_jobs = self.createJobListDictionary(options, executableDict, step=step, selectDict=selected_simulations, resourcesDict=resourcesDict)
if collect_data:
calculation_job_list = JobList(name=self.name + "_CALC_" + job_list_suffix, verbosity=options.verbose)
self.setDependenciesInJobList(calculation_job_list, all_jobs, checkFull=not selected_simulations)
specific_job_list.addJob(calculation_job_list, parents=[setup_job.id])
if selected_simulations is None:
collect_data_job = Job(TestSuiteTestCaseSub._collectData, args=[self, setup_job_args["simGroupList"]],
jobName="Restart-CollectJob", workingDirectory=self.path, executeOnMaster=True,
outputDir=os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR),
weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:RestartCollect")
specific_job_list.addJob(collect_data_job, parents=[calculation_job_list.id])
else:
self.setDependenciesInJobList(specific_job_list, all_jobs, parents=[setup_job.id], checkFull=not selected_simulations)
return specific_job_list
[docs] def getMeshingJobList(self, options: argparse.Namespace,
executableDict: ExecutableResources,
resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> typing.Optional[JobList]:
"""Collects all job objects from the jobs in the current test case instance and creates a job list of them.
:param options: options instance to extract the executables and the queue to use
:param executableDict: dictionary of executables
:return: job list representing this test case or None
"""
meshing_job_list = None
if self.restartGroup:
meshing_job_list = self._getSpecificJobList(options, executableDict, resourcesDict, constants.MESHING)
return meshing_job_list
[docs] def getRestartJobList(self, options: argparse.Namespace,
executableDict: ExecutableResources,
resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> typing.Optional[JobList]:
"""Collects all job objects from the jobs in the current test case instance and creates a job list of them.
:param options: options instance to extract the executables and the queue to use
:param executableDict: dictionary of executables
:return: job list representing this test case or None
"""
restart_job_list = None
if self.restartGroup:
restart_job_list = self._getSpecificJobList(options, executableDict, resourcesDict, constants.RESTART)
return restart_job_list
[docs] def getComputationJobList(self, options: argparse.Namespace,
executableDict: ExecutableResources,
resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> typing.Optional[JobList]:
"""Collects all job objects from the jobs in the current test case instance and creates a job list of them.
:param options: options instance to extract the executables and the queue to use
:param executableDict: dictionary of executables
:return: job list representing this test case or None
"""
computation_job_list = None
if self.simulationGroup:
computation_job_list = self._getSpecificJobList(options, executableDict, resourcesDict, constants.COMPUTATION)
return computation_job_list
[docs] def getPostprocessingJobList(self, options: argparse.Namespace,
executableDict: ExecutableResources, resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> JobList:
"""Returns a job representing the post script.
:param options: options instance to extract the executables and the queue to use
:param executableDict: dictionary of executables
:return: a jobManagement.jobs.job instance representing the job
"""
post_processing_job_list = JobList(name=self.name + "_POSTPROCESSING_JOBLIST", verbosity=options.verbose)
log_dir = os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR)
if options.selectiveCompute:
Printer.warning("Selective compute in post-processing step ignored.")
setup_job = Job(TestSuiteTestCaseSub._setupPostProcessing, args=[self],
jobName="cleanTecplotPhy", workingDirectory=self.path, executeOnMaster=True,
outputDir=log_dir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
group=f"{self.name}:Postprocessing")
post_processing_job_list.addJob(setup_job)
analysis_job_list = JobList(name=self.name + "_ANALYSIS_JOBLIST", verbosity=options.verbose)
all_jobs = self.createJobListDictionary(options, executableDict, step=constants.POSTPROCESSING, resourcesDict=resourcesDict)
self.setDependenciesInJobList(analysis_job_list, all_jobs)
post_processing_job_list.addJob(analysis_job_list, [setup_job.id])
if self.figures: # Add generating figures from Tecplot layouts
figure_names = [os.path.splitext(fig.name)[0] for fig in self.figures.figure]
for dir_path, _, file_names in os.walk(os.path.join(self.path, TestSuiteTestCaseSub.ANALYSIS_DIR)):
for file_name in file_names:
figure_name, figure_ext = os.path.splitext(file_name)
if figure_ext[1:] == constants.TECPLOT_LAYOUT_ENDING and figure_name in figure_names:
job = self._generateTecplotLayoutJob(dir_path, file_name, log_dir)
post_processing_job_list.addJob(job, [analysis_job_list.id])
figure_names.remove(figure_name)
if not figure_names:
break
if not figure_names:
break
# list of plots
for plot in self.figures.plot:
create_plot_job = Job(self.createPlot, args=[plot], jobName=f"{self.name}_createPlot", outputDir=log_dir,
weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:Postprocessing")
post_processing_job_list.addJob(create_plot_job, [analysis_job_list.id])
return post_processing_job_list
[docs] def createPlot(self, plot: supermod.MatplotlibFigure, verbose: bool = True):
"""Create plot using TestSuiteXYPlot
:param plot: plot object
:param verbose: show output
"""
if plot.testSuitePlot is not None:
if plot.testSuitePlot.variables.y is None:
currentPlot = TestSuiteXYPlot.createFromPlot(plot, basePath=self.path, verbose=verbose)
currentPlot.readData()
currentPlot.plotData(paperSize=plot.paperSize, paperOrientation=plot.paperOrientation)
else:
raise NotImplementedError("Only X-Y plots are allowed (x : x, y : value).")
elif plot.generalPlot or plot.xyPlot:
self._createGeneralPlot(plot, verbose=verbose)
else:
raise NotImplementedError("Only testSuitePlot allowed.")
[docs] @classmethod
def updateAccuracyDict(cls, accuracy_dict_base: CheckResult, accuracy_dict_update: CheckResult):
"""Merges accuracy entries from 'accuracy_dict_base' and 'accuracy_dict_update' into 'accuracy_dict_base'.
:param accuracy_dict_base: dictionary with accuracies
:param accuracy_dict_update: dictionary with accuracies
"""
accuracy_dict_base.identical = accuracy_dict_base.identical and accuracy_dict_update.identical
accuracy_dict_base.within_tolerance = accuracy_dict_base.within_tolerance and accuracy_dict_update.within_tolerance
@classmethod
def _performReferenceChecksAndAppendResults(cls, presenter: ReferenceCheckPresenter, refChecks: typing.List[ReferenceCheckMain],
checkResultsFile: CheckResultFile, forcePlot: bool,
comp_label: str = "current", ref_label: str = "reference") -> None:
"""Performs checks and append results
:param presenter: Instance of Reference check presenter class
:param refChecks: List of instances of ReferenceCheck classes
:param checkResultsFile: Dictionary with the checks results
:param forcePlot: flag if to always show the plots in the report
"""
for refCheck in sorted(refChecks, key=lambda check: check.identifier):
refCheck.performCheck()
result_filename = None
result_filetype = None
if not refCheck.withinTolerance() or forcePlot:
result_filename, result_filetype = presenter.plotData(refCheck, comp_label, ref_label)
check_result = CheckResultVariable(tolerances=refCheck.tolerances,
result=CheckResult(refCheck.identical(), refCheck.withinTolerance()),
result_file_name=result_filename,
result_file_type=result_filetype,
result_description=presenter.getDescription(refCheck))
checkResultsFile.variables[refCheck.identifier] = check_result
cls.updateAccuracyDict(checkResultsFile.result, check_result.result)
@staticmethod
def parsePlotProperties(plot: supermod.plotType) -> typing.Tuple[typing.Optional[str], typing.Optional[str], bool, bool, bool, bool, bool]:
variableX = None
variableY = None
absX = False
absResult = False
logX = False
logY = False
logResult = False
if plot:
variableX = plot.x.valueOf_ if plot.x else None
variableY = plot.y.valueOf_ if plot.y else None
absX = plot.x.abs if plot.x else False
absResult = plot.result.abs if plot.result else False
logX = plot.x.log if plot.x else False
logY = plot.y.log if plot.y else False
logResult = plot.result.log if plot.result else False
return variableX, variableY, absX, absResult, logX, logY, logResult
@classmethod
def parseDataSelectionProperties(cls, dataSelection: supermod.dataSelectionType)\
-> typing.Tuple[typing.List[str], slice, slice, slice, typing.List[str], bool]:
# dataSelection defaults
slice_i = slice_j = slice_k = slice(None)
zone_name_pattern = None
flow_solution_pattern = [cls.DEFAULT_FLOW_SOLUTION]
includeRind = False
if dataSelection:
indexSlice = dataSelection.indexSlice
includeRind = dataSelection.includeRind
if indexSlice:
if indexSlice.i:
slice_i = indexSlice.i.getSlice()
if indexSlice.j:
slice_j = indexSlice.j.getSlice()
if indexSlice.k:
slice_k = indexSlice.k.getSlice()
zone_name_pattern = dataSelection.zone
flow_solution_pattern = dataSelection.solution or flow_solution_pattern
return zone_name_pattern, slice_i, slice_j, slice_k, flow_solution_pattern, includeRind
@classmethod
def _checkReferencesFile(cls, useReferenceCheckTree: bool, filename: str, compDir: str, refDir: str, prefixResult: typing.Union[str, int],
reference: typing.Union[supermod.Reference, supermod.ReferenceTree], working_dir: str, forcePlot: bool,
step: str, comp_label: str = "current", ref_label: str = "reference") -> CheckResultFile:
"""Checks the references for an input file.
:param useReferenceCheckTree: flag whether referencechecktree should be used
:param filename: name of the input file
:param compDir: folder containing data to compare
:param refDir: folder containing references
:param prefixResult: prefix for result filenames generated by reference checks (to disambiguate between results from the same file generated by different checks)
:param reference: abstract reference object
:param working_dir: path to the working dir in the testcase (result output)
:param forcePlot: flag if to always show the plots in the report
:return: dictionary with the checks results
"""
checkResultsFile = CheckResultFile(filename, xml_description=reference.description)
compFile = os.path.join(compDir, filename)
refFile = os.path.join(refDir, filename)
fileType = reference.path.type_
fileroot_results = f"{prefixResult}_" + os.path.splitext(filename.replace("/", "_"))[0]
fileroot_results = fileroot_results.replace(".", "_") # for latex includegraphics, filename must not contain '.'
filerootResultWithPath = os.path.join(working_dir, fileroot_results)
allow_different_keys = (step == constants.STATUS_FILE_EVALUATE_REFERENCES)
print(f"Checking {compFile} against {refFile}")
_, tail = os.path.splitext(compFile)
if useReferenceCheckTree:
presenter = ReferenceCheckPresenter()
if fileType.upper() == "JSON" or (fileType.upper() == "AUTO" and tail.upper() == ".JSON"):
refData = ReferenceCheckTree.readJsonFile(refFile)
compData = ReferenceCheckTree.readJsonFile(compFile)
refChecks = []
for treePath in reference.treePath:
treePathEntries = [entry.valueOf_ for entry in treePath.TreePathEntries]
localChecks = ReferenceCheckTree.fromDict(treePathEntries, treePath.ignoreKey, treePath.toleranceRelative, treePath.toleranceAbsolute,
compFile, compData, refFile, refData,
fileroot_results=filerootResultWithPath)
refChecks.extend(localChecks)
cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label)
elif fileType.upper() == "CGNS" or (fileType.upper() == "AUTO" and tail.upper() == ".CGNS"):
with pythoncgio.CgioFile(refFile) as refFileRoot, pythoncgio.CgioFile(compFile) as compFileRoot:
refChecks = []
for treePath in reference.treePath:
treePathEntries = [entry.valueOf_ for entry in treePath.TreePathEntries]
refChecks.extend(ReferenceCheckTree.fromCGNS(treePathEntries, treePath.ignoreKey, treePath.toleranceRelative, treePath.toleranceAbsolute,
compFile, compFileRoot, refFile, refFileRoot,
fileroot_results=filerootResultWithPath))
cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label)
else:
raise NotImplementedError(f"Reference checks do not (yet) support output files of type {fileType}")
elif fileType.upper() == "GENERICFILE":
refChecks = ReferenceCheckGeneric.fromFiles(compFile, refFile, filerootResultWithPath)
presenter = ReferenceCheckPresenter()
cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label)
elif fileType.upper() == "IMAGE":
refChecks = ReferenceCheckImage.fromFiles(compFile, refFile, filerootResultWithPath)
presenter = ReferenceCheckPresenter()
cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label)
# use a specialized parser to parse the file
else:
assert reference.variable, "No Variable specified for reference check"
variableX, variableY, absX, absResult, logX, logY, logResult = cls.parsePlotProperties(reference.plot)
zone_name_pattern, slice_i, slice_j, slice_k, flow_solution_pattern, includeRind = cls.parseDataSelectionProperties(reference.dataSelection)
presenter = ReferenceCheckPresenter(abs_x=absX, abs_result=absResult, log_x=logX, log_y=logY, log_result=logResult)
comp_data = ref_data = None
for variable in reference.variable:
for name in variable.name:
print(f"\tVariable: {name}, tolerance_relative: {variable.toleranceRelative}, tolerance_absolute: {variable.toleranceAbsolute}")
reference_checks = list()
if fileType.upper() == "DAT" or (fileType.upper() == "AUTO" and tail.upper() == ".DAT"):
comp_data = comp_data or TecplotDataset.fromFile(compFile)
ref_data = ref_data or TecplotDataset.fromFile(refFile)
reference_checks = ReferenceCheck.fromTecplotASCII(comp_data, ref_data, name,
CheckTolerances(variable.toleranceAbsolute, variable.toleranceRelative),
variable_x=variableX, variable_y=variableY,
zone_name_pattern=zone_name_pattern,
slice_i=slice_i, slice_j=slice_j, slice_k=slice_k,
fileroot_results=filerootResultWithPath,
allow_different_keys=allow_different_keys)
elif fileType.upper() == "CGNS" or (fileType.upper() == "AUTO" and tail.upper() == ".CGNS"):
comp_data = comp_data or cgnsFile.CGNSDataSet(compFile)
ref_data = ref_data or cgnsFile.CGNSDataSet(refFile)
reference_checks = ReferenceCheck.fromCGNS(comp_data, ref_data, name,
CheckTolerances(variable.toleranceAbsolute, variable.toleranceRelative),
variable_x=variableX, variable_y=variableY,
zone_name_pattern=zone_name_pattern,
slice_i=slice_i, slice_j=slice_j, slice_k=slice_k,
solution_names=flow_solution_pattern, include_rind=includeRind,
fileroot_results=filerootResultWithPath,
allow_different_keys=allow_different_keys)
else:
NotImplementedError(f"Reference checks do not (yet) support output files of type {fileType}")
cls._performReferenceChecksAndAppendResults(presenter, reference_checks, checkResultsFile, forcePlot, comp_label, ref_label)
return checkResultsFile
@classmethod
def _checkReferences(cls, testcase: 'TestSuiteTestCaseSub', references: typing.Iterable[supermod.Reference], forcePlot: bool, step: str,
reference_dir: str = REFERENCE_DIR, result_dir: str = COMPUTATION_DIR, working_dir: typing.Optional[str] = None,
comp_label: str = "current", ref_label: str = "reference"):
"""Method to check references as specified in instance of reference.
Secures the result of the checks as a JSON file.
:param testcase: test case object
:param references: Reference object as specified in xsd file for test case
:param forcePlot: Force generation of plots even if everything is identical
"""
compDir = os.path.join(testcase.path, result_dir)
refDir = os.path.join(testcase.path, reference_dir)
working_dir = working_dir or TestSuiteTestCaseSub.REFERENCE_CHECK_DIR
working_dir = os.path.join(testcase.path, working_dir)
checkResultsCase = CheckResultCase()
for referenceID, reference in enumerate(references):
useReferenceCheckTree = reference.original_tagname_ == "referenceTree"
# Only handle reference which do either specify variables, use the treeReferenceCheck.
# Additionaly, references which specify the type explicitly should also handled
if useReferenceCheckTree or reference.variable or reference.path.type_ != "auto":
compFilePattern = os.path.join(compDir, reference.path.valueOf_)
refFilePattern = os.path.join(refDir, reference.path.valueOf_)
compFiles = glob.glob(compFilePattern)
refFiles = glob.glob(refFilePattern)
print("\nResult files:", ", ".join(compFiles))
print("Reference files:", ", ".join(refFiles))
# strip com/ref dir path
relRefFiles = {os.path.relpath(c, refDir) for c in refFiles}
relCompFiles = {os.path.relpath(c, compDir) for c in compFiles}
def error(step, message):
if step == constants.STATUS_FILE_EVALUATE_REFERENCES:
Printer.warning(message)
elif step == constants.STATUS_FILE_CHECK_REFERENCES:
raise RuntimeError(message)
else:
raise NotImplementedError()
if not relCompFiles:
raise RuntimeError(f"No result files found matching the expression '{reference.path.valueOf_}'.")
if not relRefFiles:
error(step, f"No reference files found matching the expression '{reference.path.valueOf_}'.")
if not relRefFiles.issubset(relCompFiles):
error(step, f"Reference file set generated by expression '{reference.path.valueOf_}' "
"is not a sub set of the result file set.\nReference or result files may be missing.")
if not relCompFiles.issubset(relRefFiles) and step == constants.STATUS_FILE_EVALUATE_REFERENCES:
Printer.warning("There is a reference in the current version of the testcase that has no match in "
"the original reference of the testcase.")
for filename in sorted(relRefFiles.intersection(relCompFiles)):
checkResultsFile = cls._checkReferencesFile(useReferenceCheckTree, filename, compDir, refDir, referenceID, reference, working_dir,
forcePlot, step, comp_label, ref_label)
cls.updateAccuracyDict(checkResultsCase.result, checkResultsFile.result)
checkResultsCase.checks.append(checkResultsFile)
check_summary_file_path = os.path.join(working_dir, cls.REFERENCE_CHECK_SUMMARY_FILE)
with open(check_summary_file_path, "w") as fp:
json.dump(dataclass_utils.dataclass_as_dict(checkResultsCase), fp, indent=4)
[docs] def getCheckReferencesJobList(self, options: argparse.Namespace) -> JobList:
"""Create a joblist to check references of test case.
:param options: options instance to extract the executables and the queue to use
:return: job list representing this test case or None
"""
myJobList = JobList(name=self.name + "_CHECKREFERENCES_JOBLIST", verbosity=options.verbose)
outputDir = os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR)
jobName = "setupChecks"
setupJob = Job(TestSuiteTestCaseSub._setupCheckReferences, args=[self],
jobName=jobName, workingDirectory=self.path, executeOnMaster=True,
outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
group="{}:ReferenceChecks".format(self.name))
myJobList.addJob(setupJob)
if self.references:
checkReferencesJob = Job(TestSuiteTestCaseSub._checkReferences, args=[self, self.references.referenceAbstract, options.forceReferencePlots,
constants.STATUS_FILE_CHECK_REFERENCES],
jobName=self.name + "_CHECK_REFERENCES", workingDirectory=self.path, executeOnMaster=True,
outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_METHOD,
group="{}:ReferenceChecks".format(self.name))
myJobList.addJob(checkReferencesJob, [setupJob.id])
return myJobList
[docs] def getEvaluateReferencesJobList(self, options: argparse.Namespace) -> JobList:
"""Create a joblist to check current against original references of test case.
:param options: options instance to extract the executables and the queue to use
:return: job list representing this test case or None
"""
myJobList = JobList(name=self.name + "_EVALUATE_REFERENCES_JOBLIST", verbosity=options.verbose)
outputDir = os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR)
jobName = "setupChecks"
setupJob = Job(TestSuiteTestCaseSub._setupCheckReferences, args=[self, constants.REFERENCE_EVALUATION_DIR],
jobName=jobName, workingDirectory=self.path, executeOnMaster=True,
outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD,
group=f"{self.name}:EvaluateReferences")
myJobList.addJob(setupJob)
if self.references:
evaluateReferencesJob = Job(TestSuiteTestCaseSub._checkReferences,
args=[self, self.references.referenceAbstract],
kwargs={'forcePlot': True,
'step': constants.STATUS_FILE_EVALUATE_REFERENCES,
'reference_dir': constants.ORIGINAL_REFERENCES_DIR,
'result_dir': constants.REFERENCE_DIR,
'working_dir': constants.REFERENCE_EVALUATION_DIR,
'comp_label': "current reference",
'ref_label': f"original reference (r{self.originalReferenceRevision})",
},
jobName=self.name + "_EVALUATE_REFERENCES", workingDirectory=self.path,
executeOnMaster=True, outputDir=outputDir,
weight=jobConsts.DEFAULT_WEIGHT_METHOD, group=f"{self.name}:EvaluateReferences")
myJobList.addJob(evaluateReferencesJob, [setupJob.id])
return myJobList
[docs] def get_state_by_step(self, step: str) -> TestCaseState:
"""Returns the state of the step passed.
:param step: step of interest
:type step: string
:return: True if an error occurred processing the test case
:rtype: TestCaseState
"""
state = super().get_state_by_step(step)
# The super call only checks if the reference job list was successful
if step == constants.STATUS_FILE_CHECK_REFERENCES or step == constants.STATUS_FILE_EVALUATE_REFERENCES:
if not self.references:
assert state == TestCaseState.SUCCESS
state = TestCaseState.NOT_AVAILABLE
elif state == TestCaseState.SUCCESS:
state = TestCaseState.DID_NOT_RUN
if step == constants.STATUS_FILE_CHECK_REFERENCES:
working_dir = constants.REFERENCE_CHECK_DIR
elif step == constants.STATUS_FILE_EVALUATE_REFERENCES:
working_dir = constants.REFERENCE_EVALUATION_DIR
else:
raise NotImplementedError(f"Unknown step {step}")
try:
with open(os.path.join(self.path, working_dir, self.REFERENCE_CHECK_SUMMARY_FILE)) as fp:
self.checkSummary = dataclass_utils.dataclass_from_dict(CheckResultCase, json.load(fp))
except FileNotFoundError:
pass
if self.checkSummary.checks:
if not self.checkSummary.result.within_tolerance:
state = TestCaseState.FAILURE
elif not self.checkSummary.result.identical:
state = TestCaseState.WITHIN_TOLERANCE
else:
state = TestCaseState.SUCCESS
return state
@classmethod
def _toleranceColor(cls, withinTolerance: bool) -> str:
return cls.SUCCESS_COLOR if withinTolerance else cls.FAIL_COLOR
@staticmethod
def _addDescription(latexDocumentInstance: pyTex, description: str):
if description is not None:
latexDocumentInstance.addText(PyTex.escapeChars(description))
@classmethod
def _shortenTable(cls, table_entries: typing.List[dataclasses.dataclass]) -> typing.List[dataclasses.dataclass]:
new_entries = table_entries
if len(table_entries) > 15:
dataclass = table_entries[0].__class__
dummy_data = len(dataclasses.fields(table_entries[0])) * ["..."]
new_entries = table_entries[:5] + 5 * [dataclass(*dummy_data)] + table_entries[-5:]
return new_entries
[docs] def addReferenceCheckSummary(self, latexDocumentInstance: PyTex, forcePlot: bool, comp_label: str, ref_label: str):
"""Add summary of reference checks to latex document.
:param latexDocumentInstance: PyTex instance of the current report to build
:param forcePlot: Force generation of plots even if everything is identical
"""
TableTreeEntry = dataclasses.make_dataclass("TableTreeEntry", zip(("Var", comp_label, ref_label, "Rel", "Abs", "Bin"),
itertools.repeat("str")))
Table0DEntry = dataclasses.make_dataclass("Table0DEntries", zip(("Zone", "Solution", "Variable", comp_label, ref_label, "Rel", "Abs", "Bin"),
itertools.repeat("str")))
Table3DEntry = dataclasses.make_dataclass("Table3DEntries", zip(("Zone", "Solution", "Variable", "Rel", "Abs", "Bin"),
itertools.repeat("str")))
chapterTypeBase = 2
latexDocumentInstance.addSectionTitle("Failed Reference Checks" if not forcePlot else "Reference Checks", chapterType=chapterTypeBase)
table_summary_contents = [["File", "Var", "Rel", "Abs", "Bin"]]
figures = []
tables_tree = []
tables_0D = []
tables_3D = []
tables_file_diffs = []
for file_id, checkResultFile in enumerate(self.checkSummary.checks):
filepath_relative = checkResultFile.file_name
filename = os.path.split(filepath_relative)[1]
table_tree_entries = []
table_0D_entries = []
table_3D_entries = []
table_file_diffs = []
for variable_name, variable in checkResultFile.variables.items():
stringVariableResult = PyTex.escapeChars(variable_name)
stringBinary = self.string_success if variable.result.identical else self.string_fail_orange
stringToleranceRelative = sci_notation(variable.tolerances.relative)
stringToleranceAbsolute = sci_notation(variable.tolerances.absolute)
stringToleranceRelativeColored = self.string_format_color.format(color=self._toleranceColor(variable.result.within_tolerance), text=stringToleranceRelative)
stringToleranceAbsoluteColored = self.string_format_color.format(color=self._toleranceColor(variable.result.within_tolerance), text=stringToleranceAbsolute)
if not variable.result.within_tolerance or forcePlot:
if variable.result_file_type in (ReferenceCheckPresenter.TYPE_TABLE_TREE, ReferenceCheckPresenter.TYPE_TABLE_0D_FIELD,
ReferenceCheckPresenter.TYPE_TABLE_3D_FIELD, ReferenceCheckPresenter.TYPE_GENERIC_FILE_DIFF):
assert variable.result_file_name is not None
with open(variable.result_file_name, "r") as fp:
resultData = dataclass_utils.dataclass_from_dict(CheckResultData, json.load(fp))
label = f"{self.name}_{os.path.split(variable.result_file_name)[1]}"
if variable.result_file_type == ReferenceCheckPresenter.TYPE_PLOT:
stringName = f"\\hyperref[{label}]{{{PyTex.escapeChars(filename)}}}"
figures.append([filepath_relative, variable.result_file_name, label, variable.result_description, checkResultFile.xml_description])
table_summary_contents.append([stringName, stringVariableResult, stringToleranceRelativeColored, stringToleranceAbsoluteColored, stringBinary])
elif variable.result_file_type == ReferenceCheckPresenter.TYPE_TABLE_TREE:
assert len(resultData) == 1
resultData = resultData[0]
table_tree_entries.extend([TableTreeEntry(PyTex.escapeChars(variable_name),
PyTex.escapeChars(comp) if isinstance(comp, str) else self.string_format_exponential_verbatim.format(value=comp),
PyTex.escapeChars(ref) if isinstance(ref, str) else self.string_format_exponential_verbatim.format(value=ref),
stringToleranceRelativeColored,
stringToleranceAbsoluteColored,
stringBinary)
for (comp, ref) in zip(resultData.result_computation, resultData.result_reference)])
elif variable.result_file_type == ReferenceCheckPresenter.TYPE_TABLE_0D_FIELD:
table_0D_entries.extend([Table0DEntry(PyTex.escapeChars(singleResult.zone_name),
PyTex.escapeChars(singleResult.solution_name),
PyTex.escapeChars(variable_name),
self.string_format_exponential_verbatim.format(value=singleResult.result_computation),
self.string_format_exponential_verbatim.format(value=singleResult.result_reference),
self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceRelative),
self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceAbsolute),
self.string_success if singleResult.result.identical else self.string_fail_orange)
for singleResult in resultData if forcePlot or not singleResult.result.identical])
elif variable.result_file_type == ReferenceCheckPresenter.TYPE_TABLE_3D_FIELD:
table_3D_entries.extend([Table3DEntry(PyTex.escapeChars(singleResult.zone_name),
PyTex.escapeChars(singleResult.solution_name),
PyTex.escapeChars(variable_name),
self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceRelative),
self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceAbsolute),
self.string_success if singleResult.result.identical else self.string_fail_orange)
for singleResult in resultData if forcePlot or not singleResult.result.identical])
elif variable.result_file_type == ReferenceCheckPresenter.TYPE_GENERIC_FILE_DIFF:
assert len(resultData) == 1
resultData = resultData[0]
for (_, _) in zip(resultData.result_computation, resultData.result_reference):
if variable_name == "file_size":
table_file_diffs.append(["File Size", stringBinary])
elif variable_name == "file_sha256":
table_file_diffs.append(["SHA256 Hash", stringBinary])
else:
raise RuntimeError(f"Unknown variable '{variable_name}' for TYPE_GENERIC_FILE_DIFF")
elif variable.result_file_type == ReferenceCheckPresenter.TYPE_IMAGE_DIFF:
if variable_name == "image":
figures.append([filepath_relative, variable.result_file_name, label, variable.result_description, checkResultFile.xml_description])
table_entry_name = PyTex.escapeChars(checkResultFile.file_name)
table_summary_contents.append([table_entry_name, stringVariableResult, stringToleranceRelativeColored, stringToleranceAbsoluteColored, stringBinary])
else:
raise RuntimeError(f"Unknown variable '{variable_name}' for TYPE_IMAGE_DIFF")
else:
raise RuntimeError(f"Invalid type of reference check result: {variable.result_file_type}. Unable to add to report.")
if any((table_tree_entries, table_0D_entries, table_3D_entries, table_file_diffs)):
label = f"{self.name}_{filepath_relative.replace('/', '_')}_{file_id}"
# summary table
stringName = f"\\hyperref[{label}]{{{PyTex.escapeChars(filename)}}}"
stringTolerance = self.string_success if checkResultFile.result.within_tolerance else self.string_fail
stringBinary = self.string_success if checkResultFile.result.identical else self.string_fail_orange
table_summary_contents.append([stringName, "Table", stringTolerance, stringTolerance, stringBinary])
# tree table
if table_tree_entries:
table_content = [[field.name for field in dataclasses.fields(table_tree_entries[0])]]
table_tree_entries = self._shortenTable(table_tree_entries)
table_content.extend(dataclasses.astuple(entry) for entry in table_tree_entries)
tables_tree.append([filepath_relative, table_content, label, checkResultFile.xml_description])
# 0D table
if table_0D_entries:
table_content = [[field.name for field in dataclasses.fields(table_0D_entries[0])]]
table_0D_entries = self._shortenTable(table_0D_entries)
table_content.extend(dataclasses.astuple(entry) for entry in table_0D_entries)
tables_0D.append([filepath_relative, table_content, label, checkResultFile.xml_description])
# 3D table
if table_3D_entries:
table_content = [[field.name for field in dataclasses.fields(table_3D_entries[0])]]
table_3D_entries = self._shortenTable(table_3D_entries)
table_content.extend(dataclasses.astuple(entry) for entry in table_3D_entries)
tables_3D.append([filepath_relative, table_content, label, checkResultFile.xml_description])
# File Table
if table_file_diffs:
table_content = [["Variable", "Bin"]]
table_content.extend(table_file_diffs)
tables_file_diffs.append([filepath_relative, table_content, label, checkResultFile.xml_description])
latexDocumentInstance.addLongtable(table_summary_contents, nCols=5, setCols="l|l|rr|l", headerSeparator="\\hline &&&&\\\\")
for filepath_relative, figurePath, label, resultDescription, xmlDescription in figures:
caption = f"\\protect\\path{{{filepath_relative}}}"
if resultDescription is not None:
caption += f" ({PyTex.escapeChars(resultDescription)})"
if xmlDescription is not None:
caption += f": {PyTex.escapeChars(xmlDescription)}"
latexDocumentInstance.addFigure(figurePath, caption=caption, label=label)
for filepath_relative, fileTableContent, label, xmlDescription in tables_tree:
latexDocumentInstance.addSectionTitle(f"\\texttt{{tree comparison: {PyTex.escapeChars(filepath_relative)}}}", chapterType=chapterTypeBase + 1, label=label)
self._addDescription(latexDocumentInstance, xmlDescription)
latexDocumentInstance.addLongtable(fileTableContent, nCols=6, setCols=">{\\hspace{0pt}}p{0.3\\linewidth}|>{\\centering\\arraybackslash\\hspace{0pt}}p{0.17\\linewidth}>{\\centering\\arraybackslash\\hspace{0pt}}p{0.17\\linewidth}|rr|l", headerSeparator="\\hline", rowSeparator="\\hline")
for filepath_relative, fileTableContent, label, xmlDescription in tables_0D:
latexDocumentInstance.addSectionTitle("\\texttt{0D field comparison: %s}" % PyTex.escapeChars(filepath_relative), chapterType=chapterTypeBase + 1, label=label)
self._addDescription(latexDocumentInstance, xmlDescription)
latexDocumentInstance.addLongtable(fileTableContent, nCols=8, setCols="l|l|l|l|l|rr|l", headerSeparator="\\hline &&&&\\\\")
for filepath_relative, fileTableContent, label, xmlDescription in tables_3D:
latexDocumentInstance.addSectionTitle("\\texttt{3D field comparison: %s}" % PyTex.escapeChars(filepath_relative), chapterType=chapterTypeBase + 1, label=label)
self._addDescription(latexDocumentInstance, xmlDescription)
latexDocumentInstance.addLongtable(fileTableContent, nCols=6, setCols="l|l|l|rr|l", headerSeparator="\\hline &&&&\\\\")
for filepath_relative, fileTableContent, label, xmlDescription in tables_file_diffs:
latexDocumentInstance.addSectionTitle("\\texttt{File comparsion: %s}" % PyTex.escapeChars(filepath_relative), chapterType=chapterTypeBase + 1, label=label)
self._addDescription(latexDocumentInstance, xmlDescription)
latexDocumentInstance.addLongtable(fileTableContent, nCols=2, setCols="l|l", headerSeparator="\\hline &\\\\")
def _getComputationWorkingDirForJob(self, jobListGroupName: str = "", jobListName: str = "") -> str:
"""Uses the given settings to determine the working directory for the current computation job.
:param jobListGroupName: simulation group name
:param jobListName: simulation name
:return: working directory for current job
"""
return os.path.join(self.path, constants.COMPUTATION_DIR, jobListGroupName, jobListName, TestSuiteTestCaseSub.INPUT_DIR)
[docs] def createJobListDictionary(self, options: argparse.Namespace, executableDict: ExecutableResources, jobListGroupTag: typing.Optional[str] = None,
jobListTag: typing.Optional[str] = None, jobTag: str = "job", workingDirFct: typing.Optional[typing.Callable] = None,
step: str = constants.COMPUTATION, selectDict: typing.Optional[AbstractTestCase.Subset] = None,
resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> AbstractTestCase.JobListDictionary:
"""Factory method to call the method in AbstractTestcase with the correct arguments.
:param options: options instance to extract the executables and the queue to use
:param executableDict: dictionary of executables
:param jobListGroupTag: XML tag for the job list group / level
:param jobListTag: XML tag for the job list / sub-level
:param jobTag: XML tag for the job / job
:param workingDirFct: method to determine the working directory for each job
:param step: permitted steps (computation, restart and postprocessing)
:param selectDict: dictionary of all jobs
:param resourcesDict: resources required by the job
:return: job dictionary representing the structure of the selected step
"""
assert step in (constants.COMPUTATION, constants.MESHING, constants.RESTART, constants.POSTPROCESSING), "Unknown step!"
if jobListGroupTag is None:
jobListGroupTag = {constants.COMPUTATION: "simulationGroup", constants.MESHING: "meshingGroup",
constants.RESTART: "restartGroup", constants.POSTPROCESSING: "analysisGroup"}[step]
if jobListTag is None:
jobListTag = "analysis" if step == constants.POSTPROCESSING else "simulation"
if workingDirFct is None:
workingDirFct = self._getAnalysisWorkingDirForJob if step == constants.POSTPROCESSING else self._getComputationWorkingDirForJob
return super().createJobListDictionary(options, executableDict, jobListGroupTag, jobListTag, jobTag, workingDirFct,
selectDict=selectDict, resourcesDict=resourcesDict)
[docs] def updateReferences(self) -> typing.Tuple[bool, str]:
"""Updates reference data for the files stored
:return: flag whether updating the reference data has been successful, list of failed files
"""
Printer.verbosePrint(f"Test case {self.name} - remove references folder", Printer.Verbosity.DEBUG)
ref_dir = os.path.join(self.path, TestSuiteTestCaseSub.REFERENCE_DIR)
removeFile(ref_dir)
errors = list()
if self.references:
for reference in self.references.referenceAbstract:
ref_path = reference.path.valueOf_
comp_dir = os.path.join(self.path, TestSuiteTestCaseSub.COMPUTATION_DIR)
comp_files = os.path.join(comp_dir, ref_path)
try:
globCopy2(comp_files, comp_dir, ref_dir)
Printer.verbosePrint(f"Copying data for '{comp_files}'", Printer.Verbosity.DEBUG)
except IOError:
errors.append(ref_path)
Printer.warning(f"contents of reference file '{ref_path}' in the test case '{self.name}' was not copied to the reference folder!")
else:
md(ref_dir)
return not errors, ",".join(errors)
[docs] def addDescriptionAndCommandline(self, options: argparse.Namespace, latexDocumentInstance: PyTex):
"""Returns the TEX output for the test case. The output contains the description and the commandline.
The top level of the output is the section level.
:param options: options of the current PAVAYO run
:param latexDocumentInstance: PyTex instance of the current report to build
"""
latexDocumentInstance.addSectionTitle("Description", chapterType=2)
latexDocumentInstance.addText(self.descriptionText)
self.addCommandLineForTex(latexDocumentInstance, updateRestart=options.updateRestartData, chapterType=2)
[docs] def addValgrindForTex(self, latexDocumentInstance: PyTex):
"""Adds the valgrind output of a test suite test case to the report.
:param latexDocumentInstance: PyTex instance of the current report to build
:type latexDocumentInstance: PyTex
"""
def parseStack(stack_elem: etree.ElementBase, text: typing.List[str]):
"""Parse a stack XML node and store the text in a list.
:param stack_elem: XML node with stack information
:param text: List to store the text
"""
frames = stack_elem.findall("frame")
for frame in frames:
obj_elem = frame.find("obj")
fn_elem = frame.find("fn")
file_elem = frame.find("file")
line_elem = frame.find("line")
if fn_elem is not None and file_elem is not None and line_elem is not None:
text.append(f"\t{fn_elem.text} ({file_elem.text}:{line_elem.text})\n")
elif obj_elem is not None and file_elem is not None and line_elem is not None:
text.append(f"\t{obj_elem.text} ({file_elem.text}:{line_elem.text})\n")
elif obj_elem is not None and fn_elem is not None:
text.append(f"\t{obj_elem.text} ({fn_elem.text})\n")
elif obj_elem is not None:
text.append(f"\t{obj_elem.text}\n")
else:
Printer.verbosePrint("Unable to parse the current Frame. Following children found:", printLevel=Printer.Verbosity.ALWAYS)
Printer.verbosePrint(frame.getchildren(), printLevel=Printer.Verbosity.ALWAYS)
def parseError(error_elem: etree.ElementBase, what_elem: etree.ElementBase, text: typing.List[str]):
"""Parse an error XML node and store the text in a list.
:param error_elem: XML node with error information
:param what_elem: XML node with what or xwhat information
:param text: List to store the text
"""
stack_elem = error_elem.find("stack")
if what_elem is not None and stack_elem is not None:
text.append(what_elem.text + "\n")
parseStack(stack_elem, text)
next_elem = stack_elem.getnext()
while next_elem is not None:
if next_elem.tag == "stack":
parseStack(next_elem, text)
elif next_elem.tag == "auxwhat":
text.append(next_elem.text + "\n")
next_elem = next_elem.getnext()
text.append("\n")
valgrind_logs = glob.glob(os.path.join(self.path, TestSuiteTestCaseSub.COMPUTATION_DIR, "*/*",
constants.VALGRIND_LOG_PREFIX + "*." + constants.XML_FILE_ENDING))
valgrind_logs.sort(key=str.lower) # Sort output files alphabetically for repeatable order.
for valgrindSummary, filename in enumerate(valgrind_logs):
(_, tail) = os.path.split(filename)
parser = etree.XMLParser(remove_blank_text=True)
try:
with open(filename, "r") as xmlFile:
valgrind_tree = etree.parse(xmlFile, parser)
except etree.XMLSyntaxError:
latexDocumentInstance.addSectionTitle(f"Valgrind Error {valgrindSummary + 1} ({PyTex.escapeChars(tail)})", chapterType=2)
latexDocumentInstance.addText("Unable to parse valgrind output.")
else:
latexDocumentInstance.addSectionTitle(f"Valgrind Error Summary {valgrindSummary + 1} ({PyTex.escapeChars(tail)})", chapterType=2)
latexDocumentInstance.addText("\\fontsize{%f}{%f}\\selectfont" % (9, 9 * 0.8))
error_text = []
leak_text = []
for error in valgrind_tree.findall("error"):
kind_elem = error.find("kind")
if kind_elem is not None:
if fnmatch.fnmatch(kind_elem.text, "Leak_*"):
what_elem = error.find("xwhat").find("text")
parseError(error, what_elem, leak_text)
else:
what_elem = error.find("what")
parseError(error, what_elem, error_text)
if error_text:
latexDocumentInstance.addVerbatim("".join(error_text))
else:
latexDocumentInstance.addText("None")
latexDocumentInstance.addText("\\normalsize")
# write memory leak summary
latexDocumentInstance.addSectionTitle("Valgrind Leak Summary {sIndex} ({name}".format(sIndex=valgrindSummary + 1, name=PyTex.escapeChars(tail)), chapterType=2)
latexDocumentInstance.addText("\\fontsize{%f}{%f}\\selectfont" % (9, 9 * 0.8))
if leak_text:
latexDocumentInstance.addVerbatim("".join(leak_text))
else:
latexDocumentInstance.addText("None")
latexDocumentInstance.addText("\\normalsize")
[docs] def addSanitizersForTex(self, latexDocumentInstance: PyTex):
"""Adds the output of a compiler sanitizer to the test suite report.
:param latexDocumentInstance: PyTex instance of the current report to build
:type latexDocumentInstance: PyTex
"""
log_files = glob.glob(os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR, "*"))
log_files.sort()
latexDocumentInstance.addSectionTitle("Sanitizer Reports", chapterType=2)
for outputFile in log_files:
with open(outputFile, "r") as inp:
lines = inp.readlines()
runtime_errors = set()
# Parse for Runtime Errors
for line in lines:
if "runtime error" in line.lower():
if len(line.split(":")) != 5:
continue # it is not a well-formatted runtime error
runtime_errors.add(line.split(":"))
sorted_runtime_errors = sorted(runtime_errors, key=lambda x: (x[0], x[1], x[2]))
# Check for additional memory errors not reported as Runtime Errors
parsing = False
current = []
additional_errors = []
for line in lines:
if line.startswith("==") and "==ERROR:" in line:
parsing = True
if current:
additional_errors.append(current)
current = []
elif line == "=================================================================\n":
if current:
additional_errors.append(current)
current = []
parsing = False
if parsing:
current.append(line)
# Last error might not be delimited by a '===' line
if current:
additional_errors.append(current)
escape = PyTex.escapeChars
if sorted_runtime_errors or additional_errors:
latexDocumentInstance.addSectionTitle("Reports for Job: {}".format(escape(os.path.basename(outputFile))), chapterType=3)
if sorted_runtime_errors:
table = ['\\begin{longtable}{lccp{.45\\textwidth}}', '\\textbf{File} & \\textbf{L} & \\textbf{C} & \\textbf{Error Message} \\\\\\hline', '\\endhead', '\\centering']
for error in sorted_runtime_errors:
table.append("{} & {} & {} & {} \\\\".format(escape(error[0]), error[1], error[2], escape(error[4])))
table.append("\\caption{ Runtime Errors }")
table.append("\\end{longtable}")
latexDocumentInstance.content += table
if additional_errors:
latexDocumentInstance.addSectionTitle("Additional Errors found:", chapterType=4)
for error in additional_errors:
latexDocumentInstance.content.append("{\\tiny")
latexDocumentInstance.addVerbatim("\n".join(error), options="baselinestretch=0.75")
latexDocumentInstance.content.append("}")
[docs] def addCommandLineForTex(self, latexDocumentInstance: PyTex, updateRestart: bool = False, chapterType: int = 2):
"""Formats the commands to a readable TEX Format.
:param latexDocumentInstance: PyTex instance of the current report to build
:param updateRestart: add job list of restart groups if True
:type latexDocumentInstance: PyTex
:type updateRestart: bool
"""
def _addGroupForTex(latexDocumentInstance: PyTex,
jobListGroup: typing.Union[typing.List[supermod.SimulationGroup], typing.List[supermod.RestartSimulationGroup]]):
"""Add job list group to Latex report
:param latexDocumentInstance: PyTex instance of the current report to build
:type latexDocumentInstance: PyTex
"""
latexDocumentInstance.addText("\\small")
lines = list()
for simGroup in jobListGroup:
for simulation in simGroup.simulation:
for job in simulation.job:
n_procs = self._adaptNProcs(job.nProcs)
arguments = " ".join(" ".join(arg.split()) for arg in job.args)
line = f"[{job.name}] "
if n_procs > 1 and job.executable not in constants.SINGLE_PROCS_EXECUTABLES:
line += f"mpirun -np {n_procs} {job.executable} {arguments}"
else:
line += f"{job.executable} {arguments}"
dependencies = []
if job.dependency:
for dependency in job.dependency:
dependency_repr = dependency.level + "/" if dependency.level else ""
dependency_repr += dependency.subLevel + "/" if dependency.subLevel else ""
dependency_repr += dependency.job
dependencies.append(dependency_repr)
else:
dependencies.append("-")
line += " ({})".format(", ".join(dependencies))
lines.append("\n ".join(textwrap.wrap(line, 90)))
if lines:
latexDocumentInstance.addVerbatim("\n".join(lines))
else:
latexDocumentInstance.addVerbatim("-")
latexDocumentInstance.addText("\\normalsize")
latexDocumentInstance.addSectionTitle("Command Line", chapterType=chapterType)
if updateRestart:
latexDocumentInstance.addSectionTitle("Restart Jobs", chapterType=chapterType + 1)
_addGroupForTex(latexDocumentInstance, self.restartGroup)
latexDocumentInstance.addSectionTitle("Simulation Jobs", chapterType=chapterType + 1)
_addGroupForTex(latexDocumentInstance, self.simulationGroup)
[docs] def getUsedExecutables(self, compute: bool = False, restart: bool = False, meshing: bool = False, postprocessing: bool = False)\
-> typing.Set[supermod.Executable]:
"""Returns the version of a software the test case was computed with.
:param compute: add executables of compute step to set?
:param restart: add executables of restart step to set?
:param meshing: add executables of meshing step to set?
:param postprocessing: add executables of postprocessing step to set?
:return: the used executables
"""
executables = set()
if meshing:
for group in self.meshingGroup:
for simulation in group.simulation:
for job in simulation.job:
executables.add(job.executable)
if restart:
for group in self.restartGroup:
for simulation in group.simulation:
for job in simulation.job:
executables.add(job.executable)
if compute:
for group in self.simulationGroup:
for simulation in group.simulation:
for job in simulation.job:
executables.add(job.executable)
return executables
[docs] def append(self, simGroup: supermod.SimulationGroup):
"""Appends a simulation group to the test case
"""
self.simulationGroup.append(simGroup)
[docs] def insert(self, index: int, value: supermod.SimulationGroup):
"""Inserts a simulation group to this test case
:param index: place to insert in
:param value: a test case job list to append
"""
self.simulationGroup.insert(index, value)
def __len__(self):
"""Returns the number of simulation groups in the test case.
:return: number of simulation groups
:rtype: int
"""
return len(self.simulationGroup)
def __getitem__(self, index):
"""Returns a simulation group at a given index.
:return: simulation group at given index
:rtype: SimulationGroup instance
"""
return self.simulationGroup[index]