Source code for mojo.pavayo.testcase.testSuiteTestCase

"""
testSuiteTestCase.py
====================

testSuiteTestCase offers the TestSuiteTestCaseSub class. The class describes a test case in the test suite.
A TestSuiteTestCase consists of simulation groups, which consist of simulations.
Simulations can consist of several jobs.
Every simulation is executed in a unique folder.

The class implements the interface demanded by AbstractTestCase.
For the generation of the job lists the respective methods of the congregated classes are used.

`Tutorial 1 <../../../../tutorials/notebooks/newTestCaseLevel3.html>`_
"""
import argparse
import collections
import dataclasses
import fnmatch
import glob
import itertools
import json
import os
import textwrap
import typing

from lxml import etree

import pythoncgio

from datfile import TecplotDataset

from ...bricabrac import dataclass_utils
from ..executables.executableResources import ExecutableResources

from . import testcase_generated as supermod
from .abstractTestCase import AbstractTestCase, TestCaseState
from ...latex import pyTex
from ...referenceCheck import ReferenceCheck
from ...referenceCheck.referenceCheckContainer import CheckResult, CheckTolerances, CheckResultVariable, CheckResultCase, CheckResultFile, CheckResultData
from ...referenceCheck.referenceCheckImage import ReferenceCheckImage
from ...referenceCheck.referenceCheckGeneric import ReferenceCheckGeneric
from ...referenceCheck.referenceCheckMain import ReferenceCheckMain
from ...referenceCheck.referenceCheckTree import ReferenceCheckTree
from ...referenceCheck.presenter import ReferenceCheckPresenter

from ...jobManagement.jobs.jobList import JobList
from ...jobManagement.jobs.job import Job
from ...jobManagement.management.clusterInfo import ClusterInfo
from ...jobManagement import jobManagementData as jobConsts
from ...bricabrac.fileIO import Printer
from ...bricabrac.fileIO import globCopy2, md
from ...bricabrac.fileIO import removeFile
from ...bricabrac.stringFormatting import sci_notation
from ...pavayo import computeData as constants
from ...plotting.testSuitePlot import TestSuiteXYPlot
from ...latex.pyTex import PyTex

from ...cgns import cgnsFile


[docs]class TestSuiteTestCaseSub(supermod.TestSuiteTestCase, collections.Sequence, AbstractTestCase): """Class to represent a test suite test case. You can iterate over the job lists of a instance. All possible parameters to the constructor of this class can be found in the XSD file used to generate testcase_generated.py. """ COMPUTATION_DIR = constants.COMPUTATION_DIR SOURCE_DIR = constants.SOURCE_DIR FIGURES_DIR = constants.FIGURES_DIR LOG_DIR = constants.LOG_DIR ANALYSIS_DIR = constants.ANALYSIS_DIR REFERENCE_DIR = constants.REFERENCE_DIR REFERENCE_CHECK_DIR = constants.REFERENCE_CHECK_DIR REFERENCE_CHECK_SUMMARY_FILE = "checkSummary.json" INPUT_DIR = constants.INPUT_DIR OUTPUT_DIR = constants.OUTPUT_DIR DEFAULT_FLOW_SOLUTION = "FlowSolution"
[docs] @staticmethod def prepareSourceFiles(testcase: 'TestSuiteTestCaseSub', simulation: supermod.Simulation, simGroupName: str, computationDir: str): """Purges the computation, log and figure folder. Copies files needed by the test cases. :param testcase: TestSuiteTestCaseSub instance :param simulation: simulation :param simGroupName: name of the simulation group :param computationDir: name of the computation directory """ md(os.path.join(testcase.path, computationDir, simGroupName, simulation.name, testcase.INPUT_DIR)) md(os.path.join(testcase.path, computationDir, simGroupName, simulation.name, testcase.OUTPUT_DIR)) if simulation.sourceFiles: for copyFile in simulation.sourceFiles.copyFile: TestSuiteTestCaseSub._copyFromSource(testcase, copyFile, simGroupName, simulation.name) for linkFile in simulation.sourceFiles.linkFile: TestSuiteTestCaseSub._linkFromSource(testcase, linkFile, simGroupName, simulation.name)
@classmethod def _setupTestCase(cls, testcase: 'TestSuiteTestCaseSub', simGroupList: typing.Iterable[supermod.SimulationGroup], keepLog: bool = False): """Performs a clean for the test case. Purges the computation, log and figure folder. Copies files needed by the test cases. :param testcase: TestSuiteTestCaseSub instance :param simGroupList: the simulation group to be used :param keepLog: flag whether log folder is removed """ removeFile(os.path.join(testcase.path, cls.COMPUTATION_DIR)) if not keepLog: removeFile(os.path.join(testcase.path, cls.LOG_DIR)) removeFile(os.path.join(testcase.path, cls.FIGURES_DIR)) removeFile(os.path.join(testcase.path, cls.REFERENCE_CHECK_DIR)) md(os.path.join(testcase.path, cls.LOG_DIR)) md(os.path.join(testcase.path, cls.FIGURES_DIR)) md(os.path.join(testcase.path, cls.REFERENCE_CHECK_DIR)) for simGroup in simGroupList: for simulation in simGroup.simulation: TestSuiteTestCaseSub.prepareSourceFiles(testcase, simulation, simGroup.name, cls.COMPUTATION_DIR) @classmethod def _setupTestCasePartial(cls, testcase: 'TestSuiteTestCaseSub', simGroupList: typing.Iterable[supermod.SimulationGroup], selectiveCompute: AbstractTestCase.Subset): """Performs a clean for the test case. Purges the computation, log and figure folder. Copies files needed by the test cases. :param testcase: TestSuiteTestCaseSub instance :param simGroupList: the simulation group to be used :param selectiveCompute: list of selected computations """ for simGroup in simGroupList: if simGroup.name in selectiveCompute: if selectiveCompute[simGroup.name] == "all": removeFile(os.path.join(testcase.path, cls.COMPUTATION_DIR, simGroup.name)) for simulation in simGroup.simulation: TestSuiteTestCaseSub.prepareSourceFiles(testcase, simulation, simGroup.name, cls.COMPUTATION_DIR) else: for simulation in simGroup.simulation: if simulation.name in selectiveCompute[simGroup.name] and selectiveCompute[simGroup.name][simulation.name] == "all": removeFile(os.path.join(testcase.path, cls.COMPUTATION_DIR, simGroup.name, simulation.name)) TestSuiteTestCaseSub.prepareSourceFiles(testcase, simulation, simGroup.name, cls.COMPUTATION_DIR) @classmethod def _setupPostProcessing(cls, testcase: 'TestSuiteTestCaseSub'): """Delete tecplot.phy files to prevent messing up the images in the test report. :param testcase: TestSuiteTestCaseSub instance """ for dirPath, _, fileNames in os.walk(os.path.join(testcase.path, cls.ANALYSIS_DIR)): for fileName in fileNames: if fileName == cls.TECPLOT_PHY_FILE_NAME: removeFile(os.path.join(dirPath, fileName)) @staticmethod def _setupCheckReferences(testcase: 'TestSuiteTestCaseSub', directory: typing.Optional[str] = None): """Performs a clean for the checks directory :param testcase: TestSuiteTestCaseSub instance :param directory: directory-suffix that should be prepared for reference checks """ directory = directory or constants.REFERENCE_CHECK_DIR removeFile(os.path.join(testcase.path, directory)) md(os.path.join(testcase.path, directory)) @staticmethod def _collectData(testcase: 'TestSuiteTestCaseSub', simGroupList: typing.Iterable[supermod.SimulationGroup]): """Collects data from computation folder in update restart mode and copies it to the appropriate simulation folder in the source folder. :param testcase: TestSuiteTestCaseSub instance :param simGroupList: the simulation group to be used """ for simGroup in simGroupList: for simulation in simGroup.simulation: TestSuiteTestCaseSub._collectDataForSingleSaveFiles(testcase, simulation.saveFiles, simGroup.name, simulation.name) def _adaptNProcs(self, nProcs: int) -> int: """Returns the number of processes to use for a job of this test case if there is no number of procs specified explicitly. The method tests if it is executed on a cluster. Then it returns either the number of procs to use on a cluster or on a workstation. :param nProcs: number of processes explicitly specified for the job; '-1' means 'not specified' :return: number of procs to use for a job """ nProcsToUse = nProcs if nProcs == -1: if ClusterInfo.onValidCluster(): nProcsToUse = self.numProcsCluster else: nProcsToUse = self.numProcsLocal return nProcsToUse def _getSpecificJobList(self, options: argparse.Namespace, executableDict: ExecutableResources, resourcesDict: typing.Dict[str, int], step: str) -> JobList: """Collects all job objects from the jobs in the current test case instance and creates a job list of them for a given step. :param options: options instance to extract the executables and the queue to use :param executableDict: dictionary of executables :param step: working step :return: job list representing this test case or None """ assert step in (constants.MESHING, constants.RESTART, constants.COMPUTATION), "Invalid step!" job_list_suffix = {constants.MESHING: "_UPDATEMESH", constants.RESTART: "_UPDATERESTART", constants.COMPUTATION: "_COMPUTATION"}[step]\ + "_JOBLIST" job_name = {constants.MESHING: "Mesh-setupJob", constants.RESTART: "Restart-setupJob", constants.COMPUTATION: "setupJob"}[step] setup_job_args = {"simGroupList": {constants.MESHING: self.meshingGroup, constants.RESTART: self.restartGroup, constants.COMPUTATION: self.simulationGroup}[step]} collect_data = step != constants.COMPUTATION selected_simulations = None if options.selectiveCompute: job_list_group_tag = {constants.COMPUTATION: "simulation", constants.RESTART: "restart", constants.POSTPROCESSING: "analysis"}[step] job_list_tag = "analysis" if step == constants.POSTPROCESSING else "simulation" selected_simulations = self.findSubsetInJobList(job_list_group_tag + "Group", job_list_tag, "job", options.selectiveCompute, step) setup_job_args["selectiveCompute"] = selected_simulations specific_job_list = JobList(name=self.name + job_list_suffix, verbosity=options.verbose, retries=0 if step != constants.COMPUTATION else options.retriesComputation, deactivateJobGrouping=options.deactivateClusterJobGrouping) setup_job = Job(TestSuiteTestCaseSub._setupTestCase if selected_simulations is None else TestSuiteTestCaseSub._setupTestCasePartial, args=[self], kwargs=setup_job_args, jobName=job_name, workingDirectory=self.path, executeOnMaster=True, outputDir=self.path, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:Init") specific_job_list.addJob(setup_job) if options.step is None or options.step != [constants.COMPUTATION_STEP_RESET]: all_jobs = self.createJobListDictionary(options, executableDict, step=step, selectDict=selected_simulations, resourcesDict=resourcesDict) if collect_data: calculation_job_list = JobList(name=self.name + "_CALC_" + job_list_suffix, verbosity=options.verbose) self.setDependenciesInJobList(calculation_job_list, all_jobs, checkFull=not selected_simulations) specific_job_list.addJob(calculation_job_list, parents=[setup_job.id]) if selected_simulations is None: collect_data_job = Job(TestSuiteTestCaseSub._collectData, args=[self, setup_job_args["simGroupList"]], jobName="Restart-CollectJob", workingDirectory=self.path, executeOnMaster=True, outputDir=os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR), weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:RestartCollect") specific_job_list.addJob(collect_data_job, parents=[calculation_job_list.id]) else: self.setDependenciesInJobList(specific_job_list, all_jobs, parents=[setup_job.id], checkFull=not selected_simulations) return specific_job_list
[docs] def getMeshingJobList(self, options: argparse.Namespace, executableDict: ExecutableResources, resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> typing.Optional[JobList]: """Collects all job objects from the jobs in the current test case instance and creates a job list of them. :param options: options instance to extract the executables and the queue to use :param executableDict: dictionary of executables :return: job list representing this test case or None """ meshing_job_list = None if self.restartGroup: meshing_job_list = self._getSpecificJobList(options, executableDict, resourcesDict, constants.MESHING) return meshing_job_list
[docs] def getRestartJobList(self, options: argparse.Namespace, executableDict: ExecutableResources, resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> typing.Optional[JobList]: """Collects all job objects from the jobs in the current test case instance and creates a job list of them. :param options: options instance to extract the executables and the queue to use :param executableDict: dictionary of executables :return: job list representing this test case or None """ restart_job_list = None if self.restartGroup: restart_job_list = self._getSpecificJobList(options, executableDict, resourcesDict, constants.RESTART) return restart_job_list
[docs] def getComputationJobList(self, options: argparse.Namespace, executableDict: ExecutableResources, resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> typing.Optional[JobList]: """Collects all job objects from the jobs in the current test case instance and creates a job list of them. :param options: options instance to extract the executables and the queue to use :param executableDict: dictionary of executables :return: job list representing this test case or None """ computation_job_list = None if self.simulationGroup: computation_job_list = self._getSpecificJobList(options, executableDict, resourcesDict, constants.COMPUTATION) return computation_job_list
[docs] def getPostprocessingJobList(self, options: argparse.Namespace, executableDict: ExecutableResources, resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> JobList: """Returns a job representing the post script. :param options: options instance to extract the executables and the queue to use :param executableDict: dictionary of executables :return: a jobManagement.jobs.job instance representing the job """ post_processing_job_list = JobList(name=self.name + "_POSTPROCESSING_JOBLIST", verbosity=options.verbose) log_dir = os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR) if options.selectiveCompute: Printer.warning("Selective compute in post-processing step ignored.") setup_job = Job(TestSuiteTestCaseSub._setupPostProcessing, args=[self], jobName="cleanTecplotPhy", workingDirectory=self.path, executeOnMaster=True, outputDir=log_dir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:Postprocessing") post_processing_job_list.addJob(setup_job) analysis_job_list = JobList(name=self.name + "_ANALYSIS_JOBLIST", verbosity=options.verbose) all_jobs = self.createJobListDictionary(options, executableDict, step=constants.POSTPROCESSING, resourcesDict=resourcesDict) self.setDependenciesInJobList(analysis_job_list, all_jobs) post_processing_job_list.addJob(analysis_job_list, [setup_job.id]) if self.figures: # Add generating figures from Tecplot layouts figure_names = [os.path.splitext(fig.name)[0] for fig in self.figures.figure] for dir_path, _, file_names in os.walk(os.path.join(self.path, TestSuiteTestCaseSub.ANALYSIS_DIR)): for file_name in file_names: figure_name, figure_ext = os.path.splitext(file_name) if figure_ext[1:] == constants.TECPLOT_LAYOUT_ENDING and figure_name in figure_names: job = self._generateTecplotLayoutJob(dir_path, file_name, log_dir) post_processing_job_list.addJob(job, [analysis_job_list.id]) figure_names.remove(figure_name) if not figure_names: break if not figure_names: break # list of plots for plot in self.figures.plot: create_plot_job = Job(self.createPlot, args=[plot], jobName=f"{self.name}_createPlot", outputDir=log_dir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:Postprocessing") post_processing_job_list.addJob(create_plot_job, [analysis_job_list.id]) return post_processing_job_list
[docs] def createPlot(self, plot: supermod.MatplotlibFigure, verbose: bool = True): """Create plot using TestSuiteXYPlot :param plot: plot object :param verbose: show output """ if plot.testSuitePlot is not None: if plot.testSuitePlot.variables.y is None: currentPlot = TestSuiteXYPlot.createFromPlot(plot, basePath=self.path, verbose=verbose) currentPlot.readData() currentPlot.plotData(paperSize=plot.paperSize, paperOrientation=plot.paperOrientation) else: raise NotImplementedError("Only X-Y plots are allowed (x : x, y : value).") elif plot.generalPlot or plot.xyPlot: self._createGeneralPlot(plot, verbose=verbose) else: raise NotImplementedError("Only testSuitePlot allowed.")
[docs] @classmethod def updateAccuracyDict(cls, accuracy_dict_base: CheckResult, accuracy_dict_update: CheckResult): """Merges accuracy entries from 'accuracy_dict_base' and 'accuracy_dict_update' into 'accuracy_dict_base'. :param accuracy_dict_base: dictionary with accuracies :param accuracy_dict_update: dictionary with accuracies """ accuracy_dict_base.identical = accuracy_dict_base.identical and accuracy_dict_update.identical accuracy_dict_base.within_tolerance = accuracy_dict_base.within_tolerance and accuracy_dict_update.within_tolerance
@classmethod def _performReferenceChecksAndAppendResults(cls, presenter: ReferenceCheckPresenter, refChecks: typing.List[ReferenceCheckMain], checkResultsFile: CheckResultFile, forcePlot: bool, comp_label: str = "current", ref_label: str = "reference") -> None: """Performs checks and append results :param presenter: Instance of Reference check presenter class :param refChecks: List of instances of ReferenceCheck classes :param checkResultsFile: Dictionary with the checks results :param forcePlot: flag if to always show the plots in the report """ for refCheck in sorted(refChecks, key=lambda check: check.identifier): refCheck.performCheck() result_filename = None result_filetype = None if not refCheck.withinTolerance() or forcePlot: result_filename, result_filetype = presenter.plotData(refCheck, comp_label, ref_label) check_result = CheckResultVariable(tolerances=refCheck.tolerances, result=CheckResult(refCheck.identical(), refCheck.withinTolerance()), result_file_name=result_filename, result_file_type=result_filetype, result_description=presenter.getDescription(refCheck)) checkResultsFile.variables[refCheck.identifier] = check_result cls.updateAccuracyDict(checkResultsFile.result, check_result.result) @staticmethod def parsePlotProperties(plot: supermod.plotType) -> typing.Tuple[typing.Optional[str], typing.Optional[str], bool, bool, bool, bool, bool]: variableX = None variableY = None absX = False absResult = False logX = False logY = False logResult = False if plot: variableX = plot.x.valueOf_ if plot.x else None variableY = plot.y.valueOf_ if plot.y else None absX = plot.x.abs if plot.x else False absResult = plot.result.abs if plot.result else False logX = plot.x.log if plot.x else False logY = plot.y.log if plot.y else False logResult = plot.result.log if plot.result else False return variableX, variableY, absX, absResult, logX, logY, logResult @classmethod def parseDataSelectionProperties(cls, dataSelection: supermod.dataSelectionType)\ -> typing.Tuple[typing.List[str], slice, slice, slice, typing.List[str], bool]: # dataSelection defaults slice_i = slice_j = slice_k = slice(None) zone_name_pattern = None flow_solution_pattern = [cls.DEFAULT_FLOW_SOLUTION] includeRind = False if dataSelection: indexSlice = dataSelection.indexSlice includeRind = dataSelection.includeRind if indexSlice: if indexSlice.i: slice_i = indexSlice.i.getSlice() if indexSlice.j: slice_j = indexSlice.j.getSlice() if indexSlice.k: slice_k = indexSlice.k.getSlice() zone_name_pattern = dataSelection.zone flow_solution_pattern = dataSelection.solution or flow_solution_pattern return zone_name_pattern, slice_i, slice_j, slice_k, flow_solution_pattern, includeRind @classmethod def _checkReferencesFile(cls, useReferenceCheckTree: bool, filename: str, compDir: str, refDir: str, prefixResult: typing.Union[str, int], reference: typing.Union[supermod.Reference, supermod.ReferenceTree], working_dir: str, forcePlot: bool, step: str, comp_label: str = "current", ref_label: str = "reference") -> CheckResultFile: """Checks the references for an input file. :param useReferenceCheckTree: flag whether referencechecktree should be used :param filename: name of the input file :param compDir: folder containing data to compare :param refDir: folder containing references :param prefixResult: prefix for result filenames generated by reference checks (to disambiguate between results from the same file generated by different checks) :param reference: abstract reference object :param working_dir: path to the working dir in the testcase (result output) :param forcePlot: flag if to always show the plots in the report :return: dictionary with the checks results """ checkResultsFile = CheckResultFile(filename, xml_description=reference.description) compFile = os.path.join(compDir, filename) refFile = os.path.join(refDir, filename) fileType = reference.path.type_ fileroot_results = f"{prefixResult}_" + os.path.splitext(filename.replace("/", "_"))[0] fileroot_results = fileroot_results.replace(".", "_") # for latex includegraphics, filename must not contain '.' filerootResultWithPath = os.path.join(working_dir, fileroot_results) allow_different_keys = (step == constants.STATUS_FILE_EVALUATE_REFERENCES) print(f"Checking {compFile} against {refFile}") _, tail = os.path.splitext(compFile) if useReferenceCheckTree: presenter = ReferenceCheckPresenter() if fileType.upper() == "JSON" or (fileType.upper() == "AUTO" and tail.upper() == ".JSON"): refData = ReferenceCheckTree.readJsonFile(refFile) compData = ReferenceCheckTree.readJsonFile(compFile) refChecks = [] for treePath in reference.treePath: treePathEntries = [entry.valueOf_ for entry in treePath.TreePathEntries] localChecks = ReferenceCheckTree.fromDict(treePathEntries, treePath.ignoreKey, treePath.toleranceRelative, treePath.toleranceAbsolute, compFile, compData, refFile, refData, fileroot_results=filerootResultWithPath) refChecks.extend(localChecks) cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label) elif fileType.upper() == "CGNS" or (fileType.upper() == "AUTO" and tail.upper() == ".CGNS"): with pythoncgio.CgioFile(refFile) as refFileRoot, pythoncgio.CgioFile(compFile) as compFileRoot: refChecks = [] for treePath in reference.treePath: treePathEntries = [entry.valueOf_ for entry in treePath.TreePathEntries] refChecks.extend(ReferenceCheckTree.fromCGNS(treePathEntries, treePath.ignoreKey, treePath.toleranceRelative, treePath.toleranceAbsolute, compFile, compFileRoot, refFile, refFileRoot, fileroot_results=filerootResultWithPath)) cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label) else: raise NotImplementedError(f"Reference checks do not (yet) support output files of type {fileType}") elif fileType.upper() == "GENERICFILE": refChecks = ReferenceCheckGeneric.fromFiles(compFile, refFile, filerootResultWithPath) presenter = ReferenceCheckPresenter() cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label) elif fileType.upper() == "IMAGE": refChecks = ReferenceCheckImage.fromFiles(compFile, refFile, filerootResultWithPath) presenter = ReferenceCheckPresenter() cls._performReferenceChecksAndAppendResults(presenter, refChecks, checkResultsFile, forcePlot, comp_label, ref_label) # use a specialized parser to parse the file else: assert reference.variable, "No Variable specified for reference check" variableX, variableY, absX, absResult, logX, logY, logResult = cls.parsePlotProperties(reference.plot) zone_name_pattern, slice_i, slice_j, slice_k, flow_solution_pattern, includeRind = cls.parseDataSelectionProperties(reference.dataSelection) presenter = ReferenceCheckPresenter(abs_x=absX, abs_result=absResult, log_x=logX, log_y=logY, log_result=logResult) comp_data = ref_data = None for variable in reference.variable: for name in variable.name: print(f"\tVariable: {name}, tolerance_relative: {variable.toleranceRelative}, tolerance_absolute: {variable.toleranceAbsolute}") reference_checks = list() if fileType.upper() == "DAT" or (fileType.upper() == "AUTO" and tail.upper() == ".DAT"): comp_data = comp_data or TecplotDataset.fromFile(compFile) ref_data = ref_data or TecplotDataset.fromFile(refFile) reference_checks = ReferenceCheck.fromTecplotASCII(comp_data, ref_data, name, CheckTolerances(variable.toleranceAbsolute, variable.toleranceRelative), variable_x=variableX, variable_y=variableY, zone_name_pattern=zone_name_pattern, slice_i=slice_i, slice_j=slice_j, slice_k=slice_k, fileroot_results=filerootResultWithPath, allow_different_keys=allow_different_keys) elif fileType.upper() == "CGNS" or (fileType.upper() == "AUTO" and tail.upper() == ".CGNS"): comp_data = comp_data or cgnsFile.CGNSDataSet(compFile) ref_data = ref_data or cgnsFile.CGNSDataSet(refFile) reference_checks = ReferenceCheck.fromCGNS(comp_data, ref_data, name, CheckTolerances(variable.toleranceAbsolute, variable.toleranceRelative), variable_x=variableX, variable_y=variableY, zone_name_pattern=zone_name_pattern, slice_i=slice_i, slice_j=slice_j, slice_k=slice_k, solution_names=flow_solution_pattern, include_rind=includeRind, fileroot_results=filerootResultWithPath, allow_different_keys=allow_different_keys) else: NotImplementedError(f"Reference checks do not (yet) support output files of type {fileType}") cls._performReferenceChecksAndAppendResults(presenter, reference_checks, checkResultsFile, forcePlot, comp_label, ref_label) return checkResultsFile @classmethod def _checkReferences(cls, testcase: 'TestSuiteTestCaseSub', references: typing.Iterable[supermod.Reference], forcePlot: bool, step: str, reference_dir: str = REFERENCE_DIR, result_dir: str = COMPUTATION_DIR, working_dir: typing.Optional[str] = None, comp_label: str = "current", ref_label: str = "reference"): """Method to check references as specified in instance of reference. Secures the result of the checks as a JSON file. :param testcase: test case object :param references: Reference object as specified in xsd file for test case :param forcePlot: Force generation of plots even if everything is identical """ compDir = os.path.join(testcase.path, result_dir) refDir = os.path.join(testcase.path, reference_dir) working_dir = working_dir or TestSuiteTestCaseSub.REFERENCE_CHECK_DIR working_dir = os.path.join(testcase.path, working_dir) checkResultsCase = CheckResultCase() for referenceID, reference in enumerate(references): useReferenceCheckTree = reference.original_tagname_ == "referenceTree" # Only handle reference which do either specify variables, use the treeReferenceCheck. # Additionaly, references which specify the type explicitly should also handled if useReferenceCheckTree or reference.variable or reference.path.type_ != "auto": compFilePattern = os.path.join(compDir, reference.path.valueOf_) refFilePattern = os.path.join(refDir, reference.path.valueOf_) compFiles = glob.glob(compFilePattern) refFiles = glob.glob(refFilePattern) print("\nResult files:", ", ".join(compFiles)) print("Reference files:", ", ".join(refFiles)) # strip com/ref dir path relRefFiles = {os.path.relpath(c, refDir) for c in refFiles} relCompFiles = {os.path.relpath(c, compDir) for c in compFiles} def error(step, message): if step == constants.STATUS_FILE_EVALUATE_REFERENCES: Printer.warning(message) elif step == constants.STATUS_FILE_CHECK_REFERENCES: raise RuntimeError(message) else: raise NotImplementedError() if not relCompFiles: raise RuntimeError(f"No result files found matching the expression '{reference.path.valueOf_}'.") if not relRefFiles: error(step, f"No reference files found matching the expression '{reference.path.valueOf_}'.") if not relRefFiles.issubset(relCompFiles): error(step, f"Reference file set generated by expression '{reference.path.valueOf_}' " "is not a sub set of the result file set.\nReference or result files may be missing.") if not relCompFiles.issubset(relRefFiles) and step == constants.STATUS_FILE_EVALUATE_REFERENCES: Printer.warning("There is a reference in the current version of the testcase that has no match in " "the original reference of the testcase.") for filename in sorted(relRefFiles.intersection(relCompFiles)): checkResultsFile = cls._checkReferencesFile(useReferenceCheckTree, filename, compDir, refDir, referenceID, reference, working_dir, forcePlot, step, comp_label, ref_label) cls.updateAccuracyDict(checkResultsCase.result, checkResultsFile.result) checkResultsCase.checks.append(checkResultsFile) check_summary_file_path = os.path.join(working_dir, cls.REFERENCE_CHECK_SUMMARY_FILE) with open(check_summary_file_path, "w") as fp: json.dump(dataclass_utils.dataclass_as_dict(checkResultsCase), fp, indent=4)
[docs] def getCheckReferencesJobList(self, options: argparse.Namespace) -> JobList: """Create a joblist to check references of test case. :param options: options instance to extract the executables and the queue to use :return: job list representing this test case or None """ myJobList = JobList(name=self.name + "_CHECKREFERENCES_JOBLIST", verbosity=options.verbose) outputDir = os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR) jobName = "setupChecks" setupJob = Job(TestSuiteTestCaseSub._setupCheckReferences, args=[self], jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group="{}:ReferenceChecks".format(self.name)) myJobList.addJob(setupJob) if self.references: checkReferencesJob = Job(TestSuiteTestCaseSub._checkReferences, args=[self, self.references.referenceAbstract, options.forceReferencePlots, constants.STATUS_FILE_CHECK_REFERENCES], jobName=self.name + "_CHECK_REFERENCES", workingDirectory=self.path, executeOnMaster=True, outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_METHOD, group="{}:ReferenceChecks".format(self.name)) myJobList.addJob(checkReferencesJob, [setupJob.id]) return myJobList
[docs] def getEvaluateReferencesJobList(self, options: argparse.Namespace) -> JobList: """Create a joblist to check current against original references of test case. :param options: options instance to extract the executables and the queue to use :return: job list representing this test case or None """ myJobList = JobList(name=self.name + "_EVALUATE_REFERENCES_JOBLIST", verbosity=options.verbose) outputDir = os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR) jobName = "setupChecks" setupJob = Job(TestSuiteTestCaseSub._setupCheckReferences, args=[self, constants.REFERENCE_EVALUATION_DIR], jobName=jobName, workingDirectory=self.path, executeOnMaster=True, outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_SMALL_METHOD, group=f"{self.name}:EvaluateReferences") myJobList.addJob(setupJob) if self.references: evaluateReferencesJob = Job(TestSuiteTestCaseSub._checkReferences, args=[self, self.references.referenceAbstract], kwargs={'forcePlot': True, 'step': constants.STATUS_FILE_EVALUATE_REFERENCES, 'reference_dir': constants.ORIGINAL_REFERENCES_DIR, 'result_dir': constants.REFERENCE_DIR, 'working_dir': constants.REFERENCE_EVALUATION_DIR, 'comp_label': "current reference", 'ref_label': f"original reference (r{self.originalReferenceRevision})", }, jobName=self.name + "_EVALUATE_REFERENCES", workingDirectory=self.path, executeOnMaster=True, outputDir=outputDir, weight=jobConsts.DEFAULT_WEIGHT_METHOD, group=f"{self.name}:EvaluateReferences") myJobList.addJob(evaluateReferencesJob, [setupJob.id]) return myJobList
[docs] def get_state_by_step(self, step: str) -> TestCaseState: """Returns the state of the step passed. :param step: step of interest :type step: string :return: True if an error occurred processing the test case :rtype: TestCaseState """ state = super().get_state_by_step(step) # The super call only checks if the reference job list was successful if step == constants.STATUS_FILE_CHECK_REFERENCES or step == constants.STATUS_FILE_EVALUATE_REFERENCES: if not self.references: assert state == TestCaseState.SUCCESS state = TestCaseState.NOT_AVAILABLE elif state == TestCaseState.SUCCESS: state = TestCaseState.DID_NOT_RUN if step == constants.STATUS_FILE_CHECK_REFERENCES: working_dir = constants.REFERENCE_CHECK_DIR elif step == constants.STATUS_FILE_EVALUATE_REFERENCES: working_dir = constants.REFERENCE_EVALUATION_DIR else: raise NotImplementedError(f"Unknown step {step}") try: with open(os.path.join(self.path, working_dir, self.REFERENCE_CHECK_SUMMARY_FILE)) as fp: self.checkSummary = dataclass_utils.dataclass_from_dict(CheckResultCase, json.load(fp)) except FileNotFoundError: pass if self.checkSummary.checks: if not self.checkSummary.result.within_tolerance: state = TestCaseState.FAILURE elif not self.checkSummary.result.identical: state = TestCaseState.WITHIN_TOLERANCE else: state = TestCaseState.SUCCESS return state
@classmethod def _toleranceColor(cls, withinTolerance: bool) -> str: return cls.SUCCESS_COLOR if withinTolerance else cls.FAIL_COLOR @staticmethod def _addDescription(latexDocumentInstance: pyTex, description: str): if description is not None: latexDocumentInstance.addText(PyTex.escapeChars(description)) @classmethod def _shortenTable(cls, table_entries: typing.List[dataclasses.dataclass]) -> typing.List[dataclasses.dataclass]: new_entries = table_entries if len(table_entries) > 15: dataclass = table_entries[0].__class__ dummy_data = len(dataclasses.fields(table_entries[0])) * ["..."] new_entries = table_entries[:5] + 5 * [dataclass(*dummy_data)] + table_entries[-5:] return new_entries
[docs] def addReferenceCheckSummary(self, latexDocumentInstance: PyTex, forcePlot: bool, comp_label: str, ref_label: str): """Add summary of reference checks to latex document. :param latexDocumentInstance: PyTex instance of the current report to build :param forcePlot: Force generation of plots even if everything is identical """ TableTreeEntry = dataclasses.make_dataclass("TableTreeEntry", zip(("Var", comp_label, ref_label, "Rel", "Abs", "Bin"), itertools.repeat("str"))) Table0DEntry = dataclasses.make_dataclass("Table0DEntries", zip(("Zone", "Solution", "Variable", comp_label, ref_label, "Rel", "Abs", "Bin"), itertools.repeat("str"))) Table3DEntry = dataclasses.make_dataclass("Table3DEntries", zip(("Zone", "Solution", "Variable", "Rel", "Abs", "Bin"), itertools.repeat("str"))) chapterTypeBase = 2 latexDocumentInstance.addSectionTitle("Failed Reference Checks" if not forcePlot else "Reference Checks", chapterType=chapterTypeBase) table_summary_contents = [["File", "Var", "Rel", "Abs", "Bin"]] figures = [] tables_tree = [] tables_0D = [] tables_3D = [] tables_file_diffs = [] for file_id, checkResultFile in enumerate(self.checkSummary.checks): filepath_relative = checkResultFile.file_name filename = os.path.split(filepath_relative)[1] table_tree_entries = [] table_0D_entries = [] table_3D_entries = [] table_file_diffs = [] for variable_name, variable in checkResultFile.variables.items(): stringVariableResult = PyTex.escapeChars(variable_name) stringBinary = self.string_success if variable.result.identical else self.string_fail_orange stringToleranceRelative = sci_notation(variable.tolerances.relative) stringToleranceAbsolute = sci_notation(variable.tolerances.absolute) stringToleranceRelativeColored = self.string_format_color.format(color=self._toleranceColor(variable.result.within_tolerance), text=stringToleranceRelative) stringToleranceAbsoluteColored = self.string_format_color.format(color=self._toleranceColor(variable.result.within_tolerance), text=stringToleranceAbsolute) if not variable.result.within_tolerance or forcePlot: if variable.result_file_type in (ReferenceCheckPresenter.TYPE_TABLE_TREE, ReferenceCheckPresenter.TYPE_TABLE_0D_FIELD, ReferenceCheckPresenter.TYPE_TABLE_3D_FIELD, ReferenceCheckPresenter.TYPE_GENERIC_FILE_DIFF): assert variable.result_file_name is not None with open(variable.result_file_name, "r") as fp: resultData = dataclass_utils.dataclass_from_dict(CheckResultData, json.load(fp)) label = f"{self.name}_{os.path.split(variable.result_file_name)[1]}" if variable.result_file_type == ReferenceCheckPresenter.TYPE_PLOT: stringName = f"\\hyperref[{label}]{{{PyTex.escapeChars(filename)}}}" figures.append([filepath_relative, variable.result_file_name, label, variable.result_description, checkResultFile.xml_description]) table_summary_contents.append([stringName, stringVariableResult, stringToleranceRelativeColored, stringToleranceAbsoluteColored, stringBinary]) elif variable.result_file_type == ReferenceCheckPresenter.TYPE_TABLE_TREE: assert len(resultData) == 1 resultData = resultData[0] table_tree_entries.extend([TableTreeEntry(PyTex.escapeChars(variable_name), PyTex.escapeChars(comp) if isinstance(comp, str) else self.string_format_exponential_verbatim.format(value=comp), PyTex.escapeChars(ref) if isinstance(ref, str) else self.string_format_exponential_verbatim.format(value=ref), stringToleranceRelativeColored, stringToleranceAbsoluteColored, stringBinary) for (comp, ref) in zip(resultData.result_computation, resultData.result_reference)]) elif variable.result_file_type == ReferenceCheckPresenter.TYPE_TABLE_0D_FIELD: table_0D_entries.extend([Table0DEntry(PyTex.escapeChars(singleResult.zone_name), PyTex.escapeChars(singleResult.solution_name), PyTex.escapeChars(variable_name), self.string_format_exponential_verbatim.format(value=singleResult.result_computation), self.string_format_exponential_verbatim.format(value=singleResult.result_reference), self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceRelative), self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceAbsolute), self.string_success if singleResult.result.identical else self.string_fail_orange) for singleResult in resultData if forcePlot or not singleResult.result.identical]) elif variable.result_file_type == ReferenceCheckPresenter.TYPE_TABLE_3D_FIELD: table_3D_entries.extend([Table3DEntry(PyTex.escapeChars(singleResult.zone_name), PyTex.escapeChars(singleResult.solution_name), PyTex.escapeChars(variable_name), self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceRelative), self.string_format_color.format(color=self._toleranceColor(singleResult.result.within_tolerance), text=stringToleranceAbsolute), self.string_success if singleResult.result.identical else self.string_fail_orange) for singleResult in resultData if forcePlot or not singleResult.result.identical]) elif variable.result_file_type == ReferenceCheckPresenter.TYPE_GENERIC_FILE_DIFF: assert len(resultData) == 1 resultData = resultData[0] for (_, _) in zip(resultData.result_computation, resultData.result_reference): if variable_name == "file_size": table_file_diffs.append(["File Size", stringBinary]) elif variable_name == "file_sha256": table_file_diffs.append(["SHA256 Hash", stringBinary]) else: raise RuntimeError(f"Unknown variable '{variable_name}' for TYPE_GENERIC_FILE_DIFF") elif variable.result_file_type == ReferenceCheckPresenter.TYPE_IMAGE_DIFF: if variable_name == "image": figures.append([filepath_relative, variable.result_file_name, label, variable.result_description, checkResultFile.xml_description]) table_entry_name = PyTex.escapeChars(checkResultFile.file_name) table_summary_contents.append([table_entry_name, stringVariableResult, stringToleranceRelativeColored, stringToleranceAbsoluteColored, stringBinary]) else: raise RuntimeError(f"Unknown variable '{variable_name}' for TYPE_IMAGE_DIFF") else: raise RuntimeError(f"Invalid type of reference check result: {variable.result_file_type}. Unable to add to report.") if any((table_tree_entries, table_0D_entries, table_3D_entries, table_file_diffs)): label = f"{self.name}_{filepath_relative.replace('/', '_')}_{file_id}" # summary table stringName = f"\\hyperref[{label}]{{{PyTex.escapeChars(filename)}}}" stringTolerance = self.string_success if checkResultFile.result.within_tolerance else self.string_fail stringBinary = self.string_success if checkResultFile.result.identical else self.string_fail_orange table_summary_contents.append([stringName, "Table", stringTolerance, stringTolerance, stringBinary]) # tree table if table_tree_entries: table_content = [[field.name for field in dataclasses.fields(table_tree_entries[0])]] table_tree_entries = self._shortenTable(table_tree_entries) table_content.extend(dataclasses.astuple(entry) for entry in table_tree_entries) tables_tree.append([filepath_relative, table_content, label, checkResultFile.xml_description]) # 0D table if table_0D_entries: table_content = [[field.name for field in dataclasses.fields(table_0D_entries[0])]] table_0D_entries = self._shortenTable(table_0D_entries) table_content.extend(dataclasses.astuple(entry) for entry in table_0D_entries) tables_0D.append([filepath_relative, table_content, label, checkResultFile.xml_description]) # 3D table if table_3D_entries: table_content = [[field.name for field in dataclasses.fields(table_3D_entries[0])]] table_3D_entries = self._shortenTable(table_3D_entries) table_content.extend(dataclasses.astuple(entry) for entry in table_3D_entries) tables_3D.append([filepath_relative, table_content, label, checkResultFile.xml_description]) # File Table if table_file_diffs: table_content = [["Variable", "Bin"]] table_content.extend(table_file_diffs) tables_file_diffs.append([filepath_relative, table_content, label, checkResultFile.xml_description]) latexDocumentInstance.addLongtable(table_summary_contents, nCols=5, setCols="l|l|rr|l", headerSeparator="\\hline &&&&\\\\") for filepath_relative, figurePath, label, resultDescription, xmlDescription in figures: caption = f"\\protect\\path{{{filepath_relative}}}" if resultDescription is not None: caption += f" ({PyTex.escapeChars(resultDescription)})" if xmlDescription is not None: caption += f": {PyTex.escapeChars(xmlDescription)}" latexDocumentInstance.addFigure(figurePath, caption=caption, label=label) for filepath_relative, fileTableContent, label, xmlDescription in tables_tree: latexDocumentInstance.addSectionTitle(f"\\texttt{{tree comparison: {PyTex.escapeChars(filepath_relative)}}}", chapterType=chapterTypeBase + 1, label=label) self._addDescription(latexDocumentInstance, xmlDescription) latexDocumentInstance.addLongtable(fileTableContent, nCols=6, setCols=">{\\hspace{0pt}}p{0.3\\linewidth}|>{\\centering\\arraybackslash\\hspace{0pt}}p{0.17\\linewidth}>{\\centering\\arraybackslash\\hspace{0pt}}p{0.17\\linewidth}|rr|l", headerSeparator="\\hline", rowSeparator="\\hline") for filepath_relative, fileTableContent, label, xmlDescription in tables_0D: latexDocumentInstance.addSectionTitle("\\texttt{0D field comparison: %s}" % PyTex.escapeChars(filepath_relative), chapterType=chapterTypeBase + 1, label=label) self._addDescription(latexDocumentInstance, xmlDescription) latexDocumentInstance.addLongtable(fileTableContent, nCols=8, setCols="l|l|l|l|l|rr|l", headerSeparator="\\hline &&&&\\\\") for filepath_relative, fileTableContent, label, xmlDescription in tables_3D: latexDocumentInstance.addSectionTitle("\\texttt{3D field comparison: %s}" % PyTex.escapeChars(filepath_relative), chapterType=chapterTypeBase + 1, label=label) self._addDescription(latexDocumentInstance, xmlDescription) latexDocumentInstance.addLongtable(fileTableContent, nCols=6, setCols="l|l|l|rr|l", headerSeparator="\\hline &&&&\\\\") for filepath_relative, fileTableContent, label, xmlDescription in tables_file_diffs: latexDocumentInstance.addSectionTitle("\\texttt{File comparsion: %s}" % PyTex.escapeChars(filepath_relative), chapterType=chapterTypeBase + 1, label=label) self._addDescription(latexDocumentInstance, xmlDescription) latexDocumentInstance.addLongtable(fileTableContent, nCols=2, setCols="l|l", headerSeparator="\\hline &\\\\")
def _getComputationWorkingDirForJob(self, jobListGroupName: str = "", jobListName: str = "") -> str: """Uses the given settings to determine the working directory for the current computation job. :param jobListGroupName: simulation group name :param jobListName: simulation name :return: working directory for current job """ return os.path.join(self.path, constants.COMPUTATION_DIR, jobListGroupName, jobListName, TestSuiteTestCaseSub.INPUT_DIR)
[docs] def createJobListDictionary(self, options: argparse.Namespace, executableDict: ExecutableResources, jobListGroupTag: typing.Optional[str] = None, jobListTag: typing.Optional[str] = None, jobTag: str = "job", workingDirFct: typing.Optional[typing.Callable] = None, step: str = constants.COMPUTATION, selectDict: typing.Optional[AbstractTestCase.Subset] = None, resourcesDict: typing.Optional[typing.Dict[str, int]] = None) -> AbstractTestCase.JobListDictionary: """Factory method to call the method in AbstractTestcase with the correct arguments. :param options: options instance to extract the executables and the queue to use :param executableDict: dictionary of executables :param jobListGroupTag: XML tag for the job list group / level :param jobListTag: XML tag for the job list / sub-level :param jobTag: XML tag for the job / job :param workingDirFct: method to determine the working directory for each job :param step: permitted steps (computation, restart and postprocessing) :param selectDict: dictionary of all jobs :param resourcesDict: resources required by the job :return: job dictionary representing the structure of the selected step """ assert step in (constants.COMPUTATION, constants.MESHING, constants.RESTART, constants.POSTPROCESSING), "Unknown step!" if jobListGroupTag is None: jobListGroupTag = {constants.COMPUTATION: "simulationGroup", constants.MESHING: "meshingGroup", constants.RESTART: "restartGroup", constants.POSTPROCESSING: "analysisGroup"}[step] if jobListTag is None: jobListTag = "analysis" if step == constants.POSTPROCESSING else "simulation" if workingDirFct is None: workingDirFct = self._getAnalysisWorkingDirForJob if step == constants.POSTPROCESSING else self._getComputationWorkingDirForJob return super().createJobListDictionary(options, executableDict, jobListGroupTag, jobListTag, jobTag, workingDirFct, selectDict=selectDict, resourcesDict=resourcesDict)
[docs] def updateReferences(self) -> typing.Tuple[bool, str]: """Updates reference data for the files stored :return: flag whether updating the reference data has been successful, list of failed files """ Printer.verbosePrint(f"Test case {self.name} - remove references folder", Printer.Verbosity.DEBUG) ref_dir = os.path.join(self.path, TestSuiteTestCaseSub.REFERENCE_DIR) removeFile(ref_dir) errors = list() if self.references: for reference in self.references.referenceAbstract: ref_path = reference.path.valueOf_ comp_dir = os.path.join(self.path, TestSuiteTestCaseSub.COMPUTATION_DIR) comp_files = os.path.join(comp_dir, ref_path) try: globCopy2(comp_files, comp_dir, ref_dir) Printer.verbosePrint(f"Copying data for '{comp_files}'", Printer.Verbosity.DEBUG) except IOError: errors.append(ref_path) Printer.warning(f"contents of reference file '{ref_path}' in the test case '{self.name}' was not copied to the reference folder!") else: md(ref_dir) return not errors, ",".join(errors)
[docs] def addFigures(self, latexDocumentInstance: PyTex): """Returns the TEX output for the test case. The output contains the figures. The top level of the output is the section level. :param latexDocumentInstance: PyTex instance of the current report to build """ latexDocumentInstance.addSectionTitle("Figures", chapterType=2) if self.figures: for figure in self.figures.figure: figure_path = os.path.join(self.path, TestSuiteTestCaseSub.FIGURES_DIR, figure.name) if figure.name.endswith(".tex"): latexDocumentInstance.addFile(figure_path, newPage=False, exists=os.path.exists(figure_path)) else: latexDocumentInstance.addFigure(figure_path, caption=figure.description, exists=os.path.exists(figure_path)) for plot in self.figures.plot: figure_path = os.path.join(self.path, TestSuiteTestCaseSub.FIGURES_DIR, plot.fileName) latexDocumentInstance.addFigure(figure_path, caption=plot.description, exists=os.path.exists(figure_path)) else: latexDocumentInstance.addText("None.")
[docs] def addDescriptionAndCommandline(self, options: argparse.Namespace, latexDocumentInstance: PyTex): """Returns the TEX output for the test case. The output contains the description and the commandline. The top level of the output is the section level. :param options: options of the current PAVAYO run :param latexDocumentInstance: PyTex instance of the current report to build """ latexDocumentInstance.addSectionTitle("Description", chapterType=2) latexDocumentInstance.addText(self.descriptionText) self.addCommandLineForTex(latexDocumentInstance, updateRestart=options.updateRestartData, chapterType=2)
[docs] def addValgrindForTex(self, latexDocumentInstance: PyTex): """Adds the valgrind output of a test suite test case to the report. :param latexDocumentInstance: PyTex instance of the current report to build :type latexDocumentInstance: PyTex """ def parseStack(stack_elem: etree.ElementBase, text: typing.List[str]): """Parse a stack XML node and store the text in a list. :param stack_elem: XML node with stack information :param text: List to store the text """ frames = stack_elem.findall("frame") for frame in frames: obj_elem = frame.find("obj") fn_elem = frame.find("fn") file_elem = frame.find("file") line_elem = frame.find("line") if fn_elem is not None and file_elem is not None and line_elem is not None: text.append(f"\t{fn_elem.text} ({file_elem.text}:{line_elem.text})\n") elif obj_elem is not None and file_elem is not None and line_elem is not None: text.append(f"\t{obj_elem.text} ({file_elem.text}:{line_elem.text})\n") elif obj_elem is not None and fn_elem is not None: text.append(f"\t{obj_elem.text} ({fn_elem.text})\n") elif obj_elem is not None: text.append(f"\t{obj_elem.text}\n") else: Printer.verbosePrint("Unable to parse the current Frame. Following children found:", printLevel=Printer.Verbosity.ALWAYS) Printer.verbosePrint(frame.getchildren(), printLevel=Printer.Verbosity.ALWAYS) def parseError(error_elem: etree.ElementBase, what_elem: etree.ElementBase, text: typing.List[str]): """Parse an error XML node and store the text in a list. :param error_elem: XML node with error information :param what_elem: XML node with what or xwhat information :param text: List to store the text """ stack_elem = error_elem.find("stack") if what_elem is not None and stack_elem is not None: text.append(what_elem.text + "\n") parseStack(stack_elem, text) next_elem = stack_elem.getnext() while next_elem is not None: if next_elem.tag == "stack": parseStack(next_elem, text) elif next_elem.tag == "auxwhat": text.append(next_elem.text + "\n") next_elem = next_elem.getnext() text.append("\n") valgrind_logs = glob.glob(os.path.join(self.path, TestSuiteTestCaseSub.COMPUTATION_DIR, "*/*", constants.VALGRIND_LOG_PREFIX + "*." + constants.XML_FILE_ENDING)) valgrind_logs.sort(key=str.lower) # Sort output files alphabetically for repeatable order. for valgrindSummary, filename in enumerate(valgrind_logs): (_, tail) = os.path.split(filename) parser = etree.XMLParser(remove_blank_text=True) try: with open(filename, "r") as xmlFile: valgrind_tree = etree.parse(xmlFile, parser) except etree.XMLSyntaxError: latexDocumentInstance.addSectionTitle(f"Valgrind Error {valgrindSummary + 1} ({PyTex.escapeChars(tail)})", chapterType=2) latexDocumentInstance.addText("Unable to parse valgrind output.") else: latexDocumentInstance.addSectionTitle(f"Valgrind Error Summary {valgrindSummary + 1} ({PyTex.escapeChars(tail)})", chapterType=2) latexDocumentInstance.addText("\\fontsize{%f}{%f}\\selectfont" % (9, 9 * 0.8)) error_text = [] leak_text = [] for error in valgrind_tree.findall("error"): kind_elem = error.find("kind") if kind_elem is not None: if fnmatch.fnmatch(kind_elem.text, "Leak_*"): what_elem = error.find("xwhat").find("text") parseError(error, what_elem, leak_text) else: what_elem = error.find("what") parseError(error, what_elem, error_text) if error_text: latexDocumentInstance.addVerbatim("".join(error_text)) else: latexDocumentInstance.addText("None") latexDocumentInstance.addText("\\normalsize") # write memory leak summary latexDocumentInstance.addSectionTitle("Valgrind Leak Summary {sIndex} ({name}".format(sIndex=valgrindSummary + 1, name=PyTex.escapeChars(tail)), chapterType=2) latexDocumentInstance.addText("\\fontsize{%f}{%f}\\selectfont" % (9, 9 * 0.8)) if leak_text: latexDocumentInstance.addVerbatim("".join(leak_text)) else: latexDocumentInstance.addText("None") latexDocumentInstance.addText("\\normalsize")
[docs] def addSanitizersForTex(self, latexDocumentInstance: PyTex): """Adds the output of a compiler sanitizer to the test suite report. :param latexDocumentInstance: PyTex instance of the current report to build :type latexDocumentInstance: PyTex """ log_files = glob.glob(os.path.join(self.path, TestSuiteTestCaseSub.LOG_DIR, "*")) log_files.sort() latexDocumentInstance.addSectionTitle("Sanitizer Reports", chapterType=2) for outputFile in log_files: with open(outputFile, "r") as inp: lines = inp.readlines() runtime_errors = set() # Parse for Runtime Errors for line in lines: if "runtime error" in line.lower(): if len(line.split(":")) != 5: continue # it is not a well-formatted runtime error runtime_errors.add(line.split(":")) sorted_runtime_errors = sorted(runtime_errors, key=lambda x: (x[0], x[1], x[2])) # Check for additional memory errors not reported as Runtime Errors parsing = False current = [] additional_errors = [] for line in lines: if line.startswith("==") and "==ERROR:" in line: parsing = True if current: additional_errors.append(current) current = [] elif line == "=================================================================\n": if current: additional_errors.append(current) current = [] parsing = False if parsing: current.append(line) # Last error might not be delimited by a '===' line if current: additional_errors.append(current) escape = PyTex.escapeChars if sorted_runtime_errors or additional_errors: latexDocumentInstance.addSectionTitle("Reports for Job: {}".format(escape(os.path.basename(outputFile))), chapterType=3) if sorted_runtime_errors: table = ['\\begin{longtable}{lccp{.45\\textwidth}}', '\\textbf{File} & \\textbf{L} & \\textbf{C} & \\textbf{Error Message} \\\\\\hline', '\\endhead', '\\centering'] for error in sorted_runtime_errors: table.append("{} & {} & {} & {} \\\\".format(escape(error[0]), error[1], error[2], escape(error[4]))) table.append("\\caption{ Runtime Errors }") table.append("\\end{longtable}") latexDocumentInstance.content += table if additional_errors: latexDocumentInstance.addSectionTitle("Additional Errors found:", chapterType=4) for error in additional_errors: latexDocumentInstance.content.append("{\\tiny") latexDocumentInstance.addVerbatim("\n".join(error), options="baselinestretch=0.75") latexDocumentInstance.content.append("}")
[docs] def addCommandLineForTex(self, latexDocumentInstance: PyTex, updateRestart: bool = False, chapterType: int = 2): """Formats the commands to a readable TEX Format. :param latexDocumentInstance: PyTex instance of the current report to build :param updateRestart: add job list of restart groups if True :type latexDocumentInstance: PyTex :type updateRestart: bool """ def _addGroupForTex(latexDocumentInstance: PyTex, jobListGroup: typing.Union[typing.List[supermod.SimulationGroup], typing.List[supermod.RestartSimulationGroup]]): """Add job list group to Latex report :param latexDocumentInstance: PyTex instance of the current report to build :type latexDocumentInstance: PyTex """ latexDocumentInstance.addText("\\small") lines = list() for simGroup in jobListGroup: for simulation in simGroup.simulation: for job in simulation.job: n_procs = self._adaptNProcs(job.nProcs) arguments = " ".join(" ".join(arg.split()) for arg in job.args) line = f"[{job.name}] " if n_procs > 1 and job.executable not in constants.SINGLE_PROCS_EXECUTABLES: line += f"mpirun -np {n_procs} {job.executable} {arguments}" else: line += f"{job.executable} {arguments}" dependencies = [] if job.dependency: for dependency in job.dependency: dependency_repr = dependency.level + "/" if dependency.level else "" dependency_repr += dependency.subLevel + "/" if dependency.subLevel else "" dependency_repr += dependency.job dependencies.append(dependency_repr) else: dependencies.append("-") line += " ({})".format(", ".join(dependencies)) lines.append("\n ".join(textwrap.wrap(line, 90))) if lines: latexDocumentInstance.addVerbatim("\n".join(lines)) else: latexDocumentInstance.addVerbatim("-") latexDocumentInstance.addText("\\normalsize") latexDocumentInstance.addSectionTitle("Command Line", chapterType=chapterType) if updateRestart: latexDocumentInstance.addSectionTitle("Restart Jobs", chapterType=chapterType + 1) _addGroupForTex(latexDocumentInstance, self.restartGroup) latexDocumentInstance.addSectionTitle("Simulation Jobs", chapterType=chapterType + 1) _addGroupForTex(latexDocumentInstance, self.simulationGroup)
[docs] def getUsedExecutables(self, compute: bool = False, restart: bool = False, meshing: bool = False, postprocessing: bool = False)\ -> typing.Set[supermod.Executable]: """Returns the version of a software the test case was computed with. :param compute: add executables of compute step to set? :param restart: add executables of restart step to set? :param meshing: add executables of meshing step to set? :param postprocessing: add executables of postprocessing step to set? :return: the used executables """ executables = set() if meshing: for group in self.meshingGroup: for simulation in group.simulation: for job in simulation.job: executables.add(job.executable) if restart: for group in self.restartGroup: for simulation in group.simulation: for job in simulation.job: executables.add(job.executable) if compute: for group in self.simulationGroup: for simulation in group.simulation: for job in simulation.job: executables.add(job.executable) return executables
[docs] def append(self, simGroup: supermod.SimulationGroup): """Appends a simulation group to the test case """ self.simulationGroup.append(simGroup)
[docs] def insert(self, index: int, value: supermod.SimulationGroup): """Inserts a simulation group to this test case :param index: place to insert in :param value: a test case job list to append """ self.simulationGroup.insert(index, value)
def __len__(self): """Returns the number of simulation groups in the test case. :return: number of simulation groups :rtype: int """ return len(self.simulationGroup) def __getitem__(self, index): """Returns a simulation group at a given index. :return: simulation group at given index :rtype: SimulationGroup instance """ return self.simulationGroup[index]