Source code for mojo.pavayo.compute

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
compute
=======
compute.py controls the execution of PAVAYO. It contains all functions to download test cases,
to build a job list from these test cases and to generate reports.
In addition it contains methods to update the reference data for given test cases and upload test cases into the
test case repository.

.. autofunction:: showTestcases
.. autofunction:: showKeywords
.. autofunction:: showSelected
.. autofunction:: prepareTestcases
.. autofunction:: downloadTestcases
.. autofunction:: lookForDownloadErrors
.. autofunction:: createJoblist
.. autofunction:: lookForErrorsInAJobList
.. autofunction:: scanTexFile
.. autofunction:: lookingForPictures
.. autofunction:: cleanErrorFile
.. autofunction:: updateReferences
.. autofunction:: uploadTestcases
.. autofunction:: createAndRunJobListForComputationPostprocessing
.. autofunction:: manageTestcases
.. autofunction:: createDependenciesDictionary
.. autofunction:: readErrorFiles
.. autofunction:: createReports
"""
import argparse
import collections
import configparser
import enum
import fileinput
import fnmatch
from functools import reduce
import glob
import itertools
import json
import os
import re
import socket
import subprocess
import time
import typing

from git import Git, Repo

from lxml import etree

from .. import getStrictMojoVersion

from ..bricabrac.fileIO import Printer, removeFile, md
from ..bricabrac.textIO import Color
from ..bricabrac.stringFormatting import getHoursMinutesSeconds, getColonDelimitedTimeString

try:
    from ..bricabrac import pysvntools
except ImportError:
    pysvntools = None

from ..jobManagement import jobManagementData as jobConsts
from ..jobManagement.jobs.abstractJob import Status
from ..jobManagement.jobs import jobList as joblist

from ..latex.pyTex import PyTex

from . import computeData as constants

from .computeMethods import createGeneralTitlePageContent, createValidationTitlePageContent
from .computeMethods import getSoftwaresVersionNumbers

from .executables.version import is_version_str_equal

from .testcase.abstractTestCase import AbstractTestCase, TestCaseState
from .testcase.generalTestCase import GeneralTestCaseSub
from .testcase.parallelTestCase import ParallelTestCaseSub
from .testcase.speedlineTestCase import SpeedlineTestCaseSub
from .testcase.suiteComponents import ReportSub
from .testcase.testSuiteTestCase import TestSuiteTestCaseSub
from .testcase.validationTestCase import ValidationTestCaseSub
from .testcase.validationTestCaseSpeedline import ValidationTestCaseSpeedlineSub
from .testcase.validationSuiteParser import writeXmlFile

from .statusOutput import FailedJobsJsonOutput, JUnitXMLOutput, OutputOnError

from ..jobManagement.jobObserver import JobEvent

from .knownErrorsParser import TestCaseExpectedToFail


Report = collections.namedtuple("Report", ("path", "showDebug", "testCaseList"))


class SuccessState(enum.IntEnum):
    NO_FAILURE = 0
    ONLY_KNOWN_ERRORS = 1
    EXPECTED_ERROR_MISSING = 2
    NEW_FAILURE = 3


[docs]def showTestcases(options, testCaseList): """Prints all test cases available in the XML file. :param options: options instance parsed from the command line :param testCaseList: test cases :type options: instance :type testCaseList: list """ testCaseList.sort(key=lambda tc: tc.name.lower()) selectedTestCases = _filterTestCases(options, testCaseList) Printer.verbosePrint("\nLegend: {0}, {1}, {2}\n".format(Color.colorString("active", Color.GREEN_FG), Color.colorString("inactive", Color.RED_FG), Color.colorString("ignored", Color.GREY_FG)), printLevel=Printer.Verbosity.ALWAYS) if options.outputFileShow: Printer.verbosePrint("Writing test case list to: '" + options.outputFileShow[0] + "'\n", printLevel=Printer.Verbosity.ALWAYS) try: with open(options.outputFileShow[0], 'w') as outfile: for tc in testCaseList: outfile.write(tc.name + "\n") outfile.close() except IOError: Printer.verbosePrint(f"\nWriting list to {options.outputFileShow[0]} failed", printLevel=Printer.Verbosity.ALWAYS) else: Printer.verbosePrint("Test cases:\n" + "=" * 11, printLevel=Printer.Verbosity.ALWAYS) for tc in testCaseList: if tc.inactive: color = Color.RED_FG elif tc in selectedTestCases: color = Color.GREEN_FG else: color = Color.GREY_FG Printer.verbosePrint(Color.colorString(tc.name, color), printLevel=Printer.Verbosity.ALWAYS)
[docs]def showKeywords(options, testCaseList): """Prints all keywords available in the the XML file. :param options: options instance parsed from the command line :param testCaseList: test cases :type options: instance :type testCaseList: list """ testCaseList.sort(key=lambda tc: tc.name.lower()) selectedTestCases = _filterTestCases(options, testCaseList) keywordDict = dict() Printer.verbosePrint("\nLegend: {0}, {1}, {2}\n".format(Color.colorString("active", Color.GREEN_FG), Color.colorString("inactive", Color.RED_FG), Color.colorString("ignored", Color.GREY_FG)), printLevel=Printer.Verbosity.ALWAYS) for tc in testCaseList: if tc.inactive: color = Color.RED_FG elif tc in selectedTestCases: color = Color.GREEN_FG else: color = Color.GREY_FG for keyword in tc.keywords.keyword: if keyword in keywordDict: keywordDict[keyword].append(Color.colorString(tc.name, color)) else: keywordDict[keyword] = [Color.colorString(tc.name, color)] Printer.verbosePrint("\nList of keywords:", printLevel=Printer.Verbosity.ALWAYS) for key in sorted(keywordDict.keys(), key=str.lower): Printer.verbosePrint("{key:30s} : {value}".format(key=Color.colorString(key, Color.BOLD), value=", ".join(keywordDict[key])), printLevel=Printer.Verbosity.ALWAYS)
def printTestcases(headerLine, colorArgs, tcList): """print list of testcases if testcase list is not empty :param headerLine: title for this section :param colorArgs: list of color arguments :param tcList: list of testcases :type headerLine: str :type colorArgs: list() :type tcList: list() """ if tcList: Printer.verbosePrint(headerLine, printLevel=Printer.Verbosity.ALWAYS) for tc in tcList: Printer.verbosePrint(Color.colorString(" - " + tc.name, *colorArgs), printLevel=Printer.Verbosity.ALWAYS)
[docs]def showSelected(options, executableDict, readTcList, suiteObject): """Prints the selected test cases only either by name or keywords. :param options: options instance parsed from the command line :param executableDict: dictionary of executables :param readTcList: test cases read from XML :param suiteObject: xml file suite object :type options: instance :type executableDict: ExecutableResources :type readTcList: list :type suiteObject: object """ svnclient, testCaseList = prepareTestcases(options, executableDict, suiteObject, readTcList) # remove inactive test cases testCaseList = [tc for tc in testCaseList if not tc.inactive] if options.showSelected == constants.SHOW_TESTCASES: if options.outputFileShow: Printer.verbosePrint("Writing test case list to: '" + options.outputFileShow[0] + "'\n", printLevel=Printer.Verbosity.ALWAYS) try: with open(options.outputFileShow[0], 'w') as outfile: for tc in testCaseList: outfile.write(tc.name + "\n") outfile.close() except IOError: Printer.verbosePrint("\nWriting list to {0} failed".format(options.outputFileShow[0]), printLevel=Printer.Verbosity.ALWAYS) else: Printer.verbosePrint("Test cases:\n" + "=" * 11, printLevel=Printer.Verbosity.ALWAYS) for tc in testCaseList: Printer.verbosePrint(Color.colorString(tc.name, Color.GREEN_FG), printLevel=Printer.Verbosity.ALWAYS) elif options.showSelected == constants.SHOW_KEYWORDS: keywordDict = dict() for tc in testCaseList: for keyword in tc.keywords.keyword: if keyword in keywordDict: keywordDict[keyword].append(tc.name) else: keywordDict[keyword] = [tc.name] Printer.verbosePrint("\nList of keywords in selected test cases (keyword : list of test cases):", printLevel=Printer.Verbosity.ALWAYS) for key in sorted(keywordDict.keys(), key=str.lower): Printer.verbosePrint("{key:30s} : {value}".format(key=Color.colorString(key, Color.BOLD), value=", ".join([Color.colorString(tc, Color.GREEN_FG) for tc in keywordDict[key]])), printLevel=Printer.Verbosity.ALWAYS) elif options.showSelected == constants.SHOW_SVN_STATUS: svnInfoDict = svnclient.getDictOfSVNInfoForRevisionFromXML() svnTestCases = [tc for tc in testCaseList if tc.name in svnInfoDict] testcasesNotInSvn = [tc for tc in testCaseList if tc not in svnTestCases] testcasesWithWrongSvn = [tc for tc in svnTestCases if not svnclient.matchingLocalAndRemoteRev(tc.path, svnInfoDict[tc.name]) and os.path.exists(tc.path)] testcasesMissing = [tc for tc in svnTestCases if not os.path.exists(tc.path)] testcasesWithCorrectSVN = [tc for tc in svnTestCases if tc not in testcasesWithWrongSvn and tc not in testcasesMissing] Printer.verbosePrint("\nSVN status based on revision {0} in XML file '{1}'.".format(svnclient.revisionOfPYSVNClient(), options.inputFile), printLevel=Printer.Verbosity.ALWAYS) # check that every testcase belongs to one list only assert len(testCaseList) == len(testcasesNotInSvn) + len(testcasesWithWrongSvn) + len(testcasesMissing) + len(testcasesWithCorrectSVN) if testcasesWithCorrectSVN: tcModified = [tc for tc in testcasesWithCorrectSVN if os.path.exists(tc.path) and svnclient.modified(tc.path)] tcCorrect = [tc for tc in testcasesWithCorrectSVN if tc not in tcModified] Printer.verbosePrint("\ntestcases in correct SVN revision:", printLevel=Printer.Verbosity.ALWAYS) printTestcases(" unchanged:", (Color.GREEN_FG,), tcCorrect) printTestcases(" modified:", (Color.RED_FG,), tcModified) if testcasesWithWrongSvn: tcModified = [tc for tc in testcasesWithWrongSvn if os.path.exists(tc.path) and svnclient.modified(tc.path)] tcCorrect = [tc for tc in testcasesWithWrongSvn if tc not in tcModified] Printer.verbosePrint("\ntestcases with different SVN revision:", printLevel=Printer.Verbosity.ALWAYS) printTestcases(" unchanged:", (Color.BROWN_FG,), tcCorrect) printTestcases(" modified:", (Color.RED_FG, Color.BLUE_BG), tcModified) if testcasesMissing: printTestcases("\nmissing testcases that have to be downloaded:", (Color.GREY_FG, Color.BOLD, Color.RED_BG), testcasesMissing) if testcasesNotInSvn: tcLocal = [tc for tc in testcasesNotInSvn if os.path.exists(tc.path)] tcNotFound = [tc for tc in testcasesNotInSvn if tc not in tcLocal] printTestcases("\ntestcases not availabe in the testcase repository:", (Color.BROWN_FG, Color.BOLD, Color.BLACK_BG), tcLocal) printTestcases("\nunknown testcases (neither in repository nor locally accessible):", (Color.BLACK_FG, Color.CYAN_BG), tcNotFound) Printer.verbosePrint("", printLevel=Printer.Verbosity.ALWAYS)
def _filterTestCases(options, tcList): """Filters the test case list for test cases specified via command line argument. :param options: option parser instance of PAVAYO :param tcList: all test cases in the input file :type options: Argparse intance :type tcList: list(AbstractTestCase instance) :return: filtered test case list :rtype: list(AbstractTestCase instance) """ if options.testcaseList: # filter for specified test cases tcReturnList = [tc for tc in tcList if any(True for pat in options.testcaseList if fnmatch.fnmatch(tc.name, pat))] else: # filter for test level tcReturnList = [tc for tc in tcList if set(tc.testLevel).intersection(set(options.testLevel))] # filter for confidentiality if options.confidentiality: tcReturnList = [tc for tc in tcReturnList if tc.confidentiality and set(tc.confidentiality.allow).intersection(set(options.confidentiality))] # filter for priority tcReturnList = [tc for tc in tcReturnList if tc.priority <= options.specPriority] # filter for keywords if options.keywords: if options.exclude: tcReturnList = [tc for tc in tcReturnList if not set(tc.keywords.keyword).intersection(set(options.keywords))] else: tcReturnList = [tc for tc in tcReturnList if set(tc.keywords.keyword).intersection(set(options.keywords))] return tcReturnList def getUsedExecutables(options, selectedTestCases): """Return executables that are about to be executed. :param options: values from the option parser :param selectedTestCases: the selected testcases :type options: options instance :type selectedTestCases: list(abstractTestCase) :return: a set of used executables :rtype: set(str) """ usedExecutables = set() if options.compute or options.updateRestartData or options.updateMeshData or options.post: for testcase in selectedTestCases: usedExecutables = usedExecutables | testcase.getUsedExecutables(compute=options.compute, restart=options.updateRestartData, meshing=options.updateMeshData, postprocessing=options.post) return usedExecutables def checkGmcPlay(options, suite, executableDict): """Perform the checks for gmcPlay :param options: values from the option parser :param suite: main handle of meta data :param executableDict: dictionary of executables :type options: options instance :type suite: Suite :type executableDict: ExecutableResources """ if not os.path.exists(options.gmcPlayPath): versionTag = f"version '{suite.gmcplayversion}' " if not options.noFixedGmcPlayVersion else "" raise RuntimeError(f"gmcPlay {versionTag}is required. Please specify the path via command line.") if suite.gmcplayversion and options.noFixedGmcPlayVersion == False: currentGmcPlayVersion = str(executableDict[constants.GMCPLAY_TEMPLATE].versionSettings.version) if currentGmcPlayVersion != suite.gmcplayversion: errorText = "You tried to use a wrong version of gmcPlay. In the XML file '{filename}' gmcPlay version '{reqVersion}' is stated. " + \ "Your version is '{curVersion}'." raise RuntimeError(errorText.format(curVersion=currentGmcPlayVersion, reqVersion=suite.gmcplayversion, filename=options.inputFile)) def checkPyMesh(options, suite, executableDict): """Perform the checks for PyMesh :param options: values from the option parser :param suite: main handle of meta data :param executableDict: dictionary of executables :type options: options instance :type suite: Suite :type executableDict: ExecutableResources """ if not options.updateMeshData: raise RuntimeError("PyMesh seems to be used outside the meshing stage") if not os.path.isfile(options.pyMeshPath): versionTag = f"version '{suite.pymeshversion}' " if not options.noFixedPyMeshVersion else "" raise RuntimeError(f"PyMesh {versionTag}is required. Please specify the path via command line.") if not options.pyMeshPath.endswith("/" + constants.PYMESH_PATH_BIN): raise RuntimeError("PyMesh-Path must be located in PyMesh directory.\n" + "Expected path ending with ..../%s, but path is %s" % (constants.PYMESH_PATH_BIN, options.pyMeshPath)) pyMeshHome = os.path.abspath(os.path.join(options.pyMeshPath, constants.PYMESH_PATH_BIN_TO_HOME)) if not os.path.isfile(os.path.join(pyMeshHome, constants.PYMESH_PATH_CHECK_TEMPLATES)): print(pyMeshHome, constants.PYMESH_PATH_CHECK_TEMPLATES) print(os.path.join(pyMeshHome, constants.PYMESH_PATH_CHECK_TEMPLATES)) raise RuntimeError("PyMesh-Path must be located in PyMesh directory.\n" + "The PyMesh directory parsed from executable is %s, \n" % pyMeshHome + "but does not look like a PyMesh directory (no file %s)." % (os.path.join(pyMeshHome, constants.PYMESH_PATH_CHECK_TEMPLATES))) if suite.pymeshversion and options.noFixedPyMeshVersion is False: currentPyMeshVersion = str(executableDict[constants.PYMESH_TEMPLATE].versionSettings.version) if not is_version_str_equal(currentPyMeshVersion, suite.pymeshversion): errorText = "You tried to use a wrong version of PyMesh. In the XML file '{filename}' PyMesh version '{reqVersion}' is stated. " + \ "Your version is '{curVersion}'." raise RuntimeError(errorText.format(curVersion=currentPyMeshVersion, reqVersion=suite.pymeshversion, filename=options.inputFile)) def checkProgramVersions(options, selectedTestCases, suite, executableDict): """Return executables that are about to be executed. :param options: values from the option parser :param selectedTestCases: the selected testcases :param suite: main handle of meta data :param executableDict: dictionary of executables :type options: options instance :type selectedTestCases: list(AbstractTestCase) :type suite: Suite :type executableDict: ExecutableResources """ usedExecutables = getUsedExecutables(options, selectedTestCases) Printer.verbosePrint(f"\nused Executables: {usedExecutables}", Printer.Verbosity.DEBUG) if suite.mojoversion: currentMojoVersion = getStrictMojoVersion() if currentMojoVersion < suite.mojoversion: raise RuntimeError(f"Your MOJO version {currentMojoVersion} is too old. Xml-file '{options.inputFile}' requires version {suite.mojoversion}.") checkGmc = not (options.gmcPlayPath == "" and options.noFixedGmcPlayVersion is True) if checkGmc and constants.GMCPLAY_TEMPLATE in usedExecutables: checkGmcPlay(options, suite, executableDict) elif constants.GMCPLAY_TEMPLATE in executableDict: Printer.verbosePrint("--> gmcPlay not in usedExecutables, removing it from executableDict", Printer.Verbosity.DEBUG) del executableDict[constants.GMCPLAY_TEMPLATE] if constants.PYMESH_TEMPLATE in usedExecutables: checkPyMesh(options, suite, executableDict) elif constants.PYMESH_TEMPLATE in executableDict: Printer.verbosePrint("--> PyMesh not in usedExecutables, removing it from executableDict", Printer.Verbosity.DEBUG) del executableDict[constants.PYMESH_TEMPLATE]
[docs]def prepareTestcases(options, executableDict, suite, readtclist): """Creates a test case list from a XML file. :param options: values from the option parser :param executableDict: dictionary of executables :param suite: main handle of meta data :param readtclist: list of test cases :type options: options instance :type executableDict: ExecutableResources :type suite: Suite :type readtclist: list(AbstractTestCase) :return: a tuple consisting on the svnclient, a list of test cases to compute, the suite name and a list of the inactive test cases :rtype: tuple[Optional[pysvntools.PYSVNClient], list[AbstraceTestCase]] """ svnclient = None if pysvntools is not None: host = suite.repository.hostAdress if options.sshuser and host.startswith('svn+ssh://'): assert '@' not in host, "You tried to specify a custom SSH user but the hostname appears to " \ "explicitly declare a user (@ in hostname)" host = host.replace('svn+ssh://', 'svn+ssh://' + options.sshuser + '@') svnclient = pysvntools.PYSVNClient(os.path.join(host, suite.repository.folder), suite.suitename, suite.repository.revision, options.uploadTestcases) selectedTestCases = _filterTestCases(options, readtclist) checkProgramVersions(options, selectedTestCases, suite, executableDict) options.suiteName = suite.suitename if options.testcaseList: if selectedTestCases: def filterFunction(tcList, pattern): return None if [tc for tc in tcList if fnmatch.fnmatch(tc.name, pattern)] else pattern namesNotFound = [filterFunction(selectedTestCases, pattern) for pattern in options.testcaseList] if any(namesNotFound): errorString = "Test case(s): '{0}' specified, but not found in the XML file '{1}'!" raise RuntimeError(errorString.format("', '".join([_f for _f in namesNotFound if _f]), options.inputFile)) else: errorString = "Test case(s): '{0}' specified, but not found in the XML file '{1}'!" raise RuntimeError(errorString.format("', '".join(options.testcaseList), options.inputFile)) selectedTestCases.sort(key=lambda tc: tc.name.lower()) selectedActiveTestCases = [tc for tc in selectedTestCases if not tc.inactive] for testcase in (tc for tc in selectedActiveTestCases if isinstance(tc, SpeedlineTestCaseSub)): missingTestCases = [elt for elt in testcase.dependency if elt not in [tc.name for tc in selectedActiveTestCases]] if missingTestCases: raise RuntimeError(f"Dependencies for test case '{testcase.name}' not fulfilled. Missing test cases: {missingTestCases}.") if options.uploadTestcases: testSuiteTC = [tc.name for tc in selectedActiveTestCases if isinstance(tc, (GeneralTestCaseSub, ParallelTestCaseSub, SpeedlineTestCaseSub))] if testSuiteTC: raise RuntimeError('Upload only available for test cases in new design! Check test case: {}'.format(' '.join(testSuiteTC))) if options.uploadTestcases or options.download: svnclient.remoteRevisionAvailable() # check for available Revision number return svnclient, selectedTestCases
[docs]def downloadTestcases(options, svnclient, testcaseList): """Download the test cases from a server. :param options: values from the option parser :param svnclient: pysvn connections :param testcaseList: list of test cases to download :type options: options instance :type svnclient: class :type testcaseList: list """ Printer.verbosePrint("\n%-18s '%s'" % ("Repository URL", svnclient.url()), Printer.Verbosity.JENKINS) Printer.verbosePrint("%-18s %s" % ("Revision", svnclient.revisionOfPYSVNClient()), Printer.Verbosity.JENKINS) Printer.verbosePrint("%-18s '%s'" % ("Local folder", os.path.abspath(svnclient.localDir)), Printer.Verbosity.JENKINS) svnclient.cleanUp() if svnclient.getLocalRevisionNumberOfPath(svnclient.localDir) == -1: Printer.verbosePrint("Empty check out of top level folder \"%s\"" % os.path.abspath(svnclient.localDir), Printer.Verbosity.DEBUG) svnclient.checkOut(depth=pysvntools.PYSVNClient.EMPTY) else: svnclient.update(svnclient.localDir, depth=pysvntools.PYSVNClient.EMPTY) Printer.verbosePrint("Using local top level folder \"%s\"" % os.path.abspath(svnclient.localDir), Printer.Verbosity.DEBUG) svnInfoDict = svnclient.getDictOfSVNInfoForRevisionFromXML() svnTestCases = [tc for tc in testcaseList if tc.name in svnInfoDict] onlyLocalTestCases = [tc for tc in testcaseList if tc not in svnTestCases] if onlyLocalTestCases: Printer.verbosePrint("\nTest cases are not availabe in the testcase repository.", Printer.Verbosity.JENKINS) for tc in onlyLocalTestCases: Printer.verbosePrint(" " * 4 + tc.name, Printer.Verbosity.JENKINS) if not options.forceUpdate: testcasesToDownload = [tc for tc in svnTestCases if not (svnclient.matchingLocalAndRemoteRev(tc.path, svnInfoDict[tc.name]) and os.path.exists(tc.path))] # Locally modified test cases which are not in correct revision will be removed. testCasesModified = [tc for tc in testcasesToDownload if os.path.exists(tc.path) and svnclient.modified(tc.path)] if testCasesModified: Printer.verbosePrint("\nFollowing test cases with local modifications and old revisions will be removed:", Printer.Verbosity.JENKINS) for tc in testCasesModified: Printer.verbosePrint(" " * 4 + tc.name, Printer.Verbosity.JENKINS) choice = input("Do you really want to overwrite local modifications? [yes/no] ").lower() if choice != 'yes': raise RuntimeError("Download was cancelled!") # Locally available test case folder will be only overwritten if local test case is under version control AND the remote version differs from the local version. testcasesAvailableLocally = [tc for tc in svnTestCases if tc not in testcasesToDownload] if testcasesAvailableLocally: Printer.verbosePrint("\nTest cases in correct versions available locally and will not be downloaded. Use --forceUpdate to download these as well.", Printer.Verbosity.JENKINS) for tc in testcasesAvailableLocally: modifiedString = " ({0})".format(svnclient.modified(tc.path)) if (svnclient.modified(tc.path) and '3' in tc.testLevel) else "" Printer.verbosePrint(" " * 4 + tc.name + modifiedString, Printer.Verbosity.JENKINS) else: testcasesToDownload = svnTestCases if testcasesToDownload: Printer.verbosePrint("\nStarting download of necessary files from repository.") for testcase in testcasesToDownload: if testcase.inactive: continue svnclient.svnCleanTestCases(testcase.path) svnclient.update(testcase.path, depth=pysvntools.PYSVNClient.INFINITY) Printer.verbosePrint(" " * 4 + testcase.name + ' downloaded', Printer.Verbosity.ALWAYS)
def download_original_references(suite_name: str, testcase: AbstractTestCase, svnclient: pysvntools.PYSVNClient): if testcase.originalReferenceRevision: svnclient.checkOut(pysvntools.PYSVNClient.INFINITY, revision=testcase.originalReferenceRevision, local_dir=f"{suite_name}/{testcase.name}/{constants.ORIGINAL_REFERENCES_DIR}/", path_suffix=f"{testcase.name}/{constants.REFERENCE_DIR}") else: Printer.warning(f"Testcase '{testcase.name}' does not have an original reference revision!")
[docs]def lookForDownloadErrors(testcaseList): """Looks for missing files and saves the errors in the test cases. :param testcaseList: list of test cases :type testcaseList: list """ for testcase in testcaseList: try: if len(os.listdir(testcase.path)) <= 1: # the error file is always in the folder testcase.errorContainer.set(constants.STATUS_FILE_DOWNLOAD, constants.STATUS_FILE_DOWNLOAD_ERROR_KEY, constants.ERROR_STRING_DOWNLOAD) except OSError: testcase.errorContainer.set(constants.STATUS_FILE_DOWNLOAD, constants.STATUS_FILE_DOWNLOAD_ERROR_KEY, constants.ERROR_STRING_DOWNLOAD) md(testcase.path) testcase.writeStatusFile()
[docs]def createJoblist(options, executableDict, testCaseList, resourcesDict=None, postOnly=False): """Creates the main job list; containing the computation jobs and post processing jobs. :param options: options instance from command line parser :param executableDict: dictionary of executables :param testCaseList: list of test cases :param resourcesDict: resources available :param postOnly: puts only post jobs in the jobList :type options: options instance :type executableDict: ExecutableResources :type testCaseList: list :type testCaseList: dict :type postOnly: bool :return: main job list :rtype: JobList """ activeTestCaseList = [tc for tc in testCaseList if not tc.inactive] if options.getVersionsFromExecutable and not postOnly: for software, version in getSoftwaresVersionNumbers(options, executableDict, readVersionsFromExecutables=True).items(): for testcase in activeTestCaseList: testcase.errorContainer.set(constants.STATUS_FILE_INFO, f"{software}version", str(version)) testcase.writeStatusFile() jobList = joblist.JobList(name="computeJoblist", verbosity=options.verbose, nProcsAvailable=options.nprocs, nTecplotLicenses=options.tecplotLicense, resourcesDict=resourcesDict, additionalProcessStatus=options.additionalProcessStatus, deactivateJobGrouping=options.deactivateClusterJobGrouping) testcaseErrorList = readErrorFiles(activeTestCaseList, constants.STATUS_FILE_DOWNLOAD) if not options.compute: # look for errors when the computation was in a prior run testcaseErrorList += readErrorFiles(activeTestCaseList, constants.STATUS_FILE_COMPUTE) dependentTestCases = createDependenciesDictionary([tc for tc in activeTestCaseList if isinstance(tc, SpeedlineTestCaseSub)]) for testCase in activeTestCaseList: _resetJobIds(testCase) for testCase in activeTestCaseList: if testCase in testcaseErrorList: continue if isinstance(testCase, SpeedlineTestCaseSub) and testCase.dependency: fileName = os.path.join(testCase.path.encode("utf-8"), testCase.name.encode("utf-8") + ".XML") writeXmlFile(TClist=dependentTestCases[testCase.name], mojoversion=str(getStrictMojoVersion()), exit_file=fileName) if options.updateMeshData and not postOnly: _addMeshingJob(options, executableDict, testCase, jobList, resourcesDict=resourcesDict) if options.updateRestartData and not postOnly: _addRestartJob(options, executableDict, testCase, jobList, resourcesDict=resourcesDict) if options.compute and not postOnly: _addComputationJob(options, executableDict, testCase, jobList, resourcesDict=resourcesDict) for testCase in activeTestCaseList: if testCase in testcaseErrorList: continue if options.post: _addPostprocessingJob(options, executableDict, testCase, jobList, dependentTestCases, postOnly, resourcesDict=resourcesDict) if not options.skipReferenceChecks: _addCheckReferencesJob(options, testCase, jobList) elif options.evaluateReferences: evaluate_references_job_list = testCase.getEvaluateReferencesJobList(options) if evaluate_references_job_list: jobList.addJob(evaluate_references_job_list) testCase.jobIdsEvaluateReferences.append(evaluate_references_job_list.id) return jobList
[docs]def lookForErrorsInAJobList(testcaseList, jobList, step): """Looks into the job list to get errors which happened during the computation. :param testcaseList: list of test cases :param jobList: job list which was computed :param step: step whose errors are looked for :type testcaseList: list :type jobList: JobList instance :type step: string """ valid_steps = (constants.STATUS_FILE_COMPUTE, constants.STATUS_FILE_POST, constants.STATUS_FILE_CHECK_REFERENCES, constants.STATUS_FILE_EVALUATE_REFERENCES) assert step in valid_steps, f"Argument 'step' must be one of '{valid_steps}'. You specified '{step}'." failedJobsInJobList = _getJobsRecursive(jobList, "failedJobs") for testcase in testcaseList: if step == constants.STATUS_FILE_COMPUTE: idsToLookAt = testcase.jobIdsComputation elif step == constants.STATUS_FILE_POST: idsToLookAt = testcase.jobIdsPostprocessing elif step == constants.STATUS_FILE_CHECK_REFERENCES: idsToLookAt = testcase.jobIdsCheckReferences elif step == constants.STATUS_FILE_EVALUATE_REFERENCES: idsToLookAt = testcase.jobIdsEvaluateReferences idsToLookAt = _getJobIds(jobList, idsToLookAt) failed_jobs = (failedJobsInJobList[job_id] for job_id in idsToLookAt if job_id in failedJobsInJobList) for job in failed_jobs: errorString = f"'{job.name}', id: {job.id:03} failed with the message: {job.status.message}. ({os.path.relpath(job.outFile)})" testcase.errorContainer.set(step, str(job.outFile).replace('=', ''), errorString) testcase.writeStatusFile() if step == constants.STATUS_FILE_POST: # todo: rework !!! # We cannot assume, that every speed-line or parallel test case has a post-processing, # therefore a TEX file, so we need to find out if we have to look for pictures def testcasesToCheck(tc): return not tc.errorContainer.options(constants.STATUS_FILE_POST) \ and (isinstance(tc, (GeneralTestCaseSub, TestSuiteTestCaseSub, ValidationTestCaseSub, ValidationTestCaseSpeedlineSub)) or tc.postScript) list_postpath = [tc for tc in testcaseList if testcasesToCheck(tc)] for testcase in list_postpath: if not isinstance(testcase, TestSuiteTestCaseSub): addEndings = True try: listOfExpectedPictures = scanTexFile(testcase) except RuntimeError: listOfExpectedPictures = list() testcase.errorContainer.set(constants.STATUS_FILE_DOWNLOAD, "missing_tex_file", "Test cases TEX file not found!") else: addEndings = False if testcase.figures: listOfExpectedPictures = [os.path.join(testcase.path, testcase.FIGURES_DIR, figure.name) for figure in testcase.figures.figure] else: listOfExpectedPictures = list() # Chasing missing picture(s) missingPictures = lookingForPictures(listOfExpectedPictures, addEndings=addEndings) if missingPictures: testcase.errorContainer.set(constants.STATUS_FILE_POST, "missing_pictures", f"Missing Picture(s): {missingPictures}") testcase.writeStatusFile()
def _getJobsRecursive(jobList, jobResult): """Returns recursive the jobs from a job list. :param jobList: root job list :param jobResult: type of outcome, "failed", "notStarted" or "successful" :type jobList: JobList instance :type jobResult: string :return: list of jobs with the outcome asked for :rtype: list """ results = dict() for aJob in getattr(jobList, jobResult): if isinstance(aJob, joblist.JobList): results.update(_getJobsRecursive(aJob, jobResult)) else: results[aJob.id] = aJob return results def _getJobIds(jobList, ids): """Returns recursive the job ids of a jobList. :param jobList: job list instance with the interesting jobs :param ids: ids of the interesting jobs :type jobList: jobList instance :type ids: list :return: returns the ids of all failed or not started jobs from a joblist in the given joblist if its id is in ids. :rtype: list """ # flattens a given list, for example: flattenList([[1],[2,3],[4,5,6]]) => [1, 2, 3, 4, 5, 6] def flattenList(inputList): return [el for curList in inputList for el in curList] def getIdsRecursive(currentJob): """Returns the ids of all failed jobs or not started jobs in a job list or the id of the given job. """ if isinstance(currentJob, joblist.JobList): return flattenList([getIdsRecursive(aJob) for aJob in itertools.chain(currentJob.failedJobs, currentJob.notStartedJobs)]) else: return [currentJob.id] return flattenList([getIdsRecursive(job) for job in itertools.chain(jobList.failedJobs, jobList.notStartedJobs) if job.id in ids]) + ids
[docs]def scanTexFile(testcase): """Scans the TEX file of the test case in order to know which pictures are required to generate the pdf. :param testcase: Testcase instance which will be analyzed :type testcase: Testcase instance :return: list of expected pictures :rtype: list """ expectedPictures = list() for elt in testcase.descriptionText.split("\n"): match = re.match(constants.REG_EXP_PICTURE, elt.strip()) if match: pictureName = match.groups()[0] expectedPictures.append(os.path.join(os.path.split(testcase.path)[0], os.path.splitext(pictureName)[0])) return expectedPictures
[docs]def lookingForPictures(expectedPictures, addEndings=True): """Looks if All the pictures have been produced. If it is not the case, a list of strings containing the name of the missing picture(s) will be written. :param expectedPictures: a list of picture expected :param addEndings: add different endings to the image file :type expectedPictures: list :type addEndings: bool :return: a list containing strings with the missing pictures in latex format :rtype: list """ missingPictures = list() for picture in expectedPictures: if addEndings: if not reduce(lambda a, b: a or b, (bool(glob.glob(picture + os.extsep + ending)) for ending in constants.PICTURE_ENDINGS)): missingPictures.append(picture) else: if not glob.glob(picture): missingPictures.append(picture) return missingPictures
[docs]def cleanErrorFile(testcaseList, step): """Cleans the given section in the test cases error files. :param testcaseList: list of test cases :param step: step for which to clean the error file :type testcaseList: list :type step: string """ try: assert step in (constants.STATUS_FILE_DOWNLOAD, constants.STATUS_FILE_COMPUTE, constants.STATUS_FILE_POST, constants.STATUS_FILE_CHECK_REFERENCES, constants.STATUS_FILE_EVALUATE_REFERENCES) except AssertionError: Printer.warning("You tried to clean a nonexistent section in the error files!\n Continuing without cleaning anything!") return for testcase in testcaseList: for option in testcase.errorContainer.options(step): testcase.errorContainer.remove_option(step, option)
def synchronizeReferenceVersionInTestcases(testcaseList): """Read reference version :param testcaseList: list of test cases :type testcaseList: list """ for testcase in testcaseList: try: testcase.updateReferenceVersionFromStatusFile() except (configparser.NoOptionError, configparser.NoSectionError): testcase.writeReferenceVersionToStatusFile() def prepareTraceSuiteXMLforUpload(options, svnclient, testcaseList): """Update data in XML files for the test cases Currently the reference Trace Suite versions are modified only. :param options: specified test suite levels :param svnclient: SVN client :param testcaseList: list of test cases :type options: list(int) :type svnclient: pysvntools.PYSVNClient :type testcaseList: list(AbstractTestCase) """ Printer.verbosePrint("Prepare XML file(s) for SVN commit", printLevel=Printer.Verbosity.DEBUG) xmlMainTree = etree.parse(options.inputFile) repositoryNode = xmlMainTree.find(constants.XML_SVN_REPOSITORY_TAG_NAME) revisionInXmlNode = repositoryNode.find(constants.XML_SVN_REVISION_TAG_NAME) revisionXml = int(revisionInXmlNode.text) revisionSvn = svnclient.getRemoteHeadRevisionNumber() if revisionXml == revisionSvn: updateXMLfile = False else: if revisionXml > revisionSvn: messageText = f"SVN revision ({revisionXml}) in XML file '{options.inputFile}' is larger than the head revision ({revisionSvn})." Printer.verbosePrint(messageText, printLevel=Printer.Verbosity.DEFAULT) updateXMLfile = True # check whether test cases are in a single file by monitoring the number of internal entities if xmlMainTree.docinfo.internalDTD is None: root = xmlMainTree.find(constants.XML_SVN_TCLIST_TAG_NAME) tcNameAndRefDict = {tc.name: tc.referenceTraceSuite for tc in testcaseList} for tc in root: testCaseNameNode = tc.find(constants.XML_TESTCASE_TAG_NAME) tcName = testCaseNameNode.text if tcName in tcNameAndRefDict.keys(): testCaseNewVersion = tcNameAndRefDict[tcName] try: testCaseVersionNode = tc.find(constants.XML_TESTCASE_TAG_REFERENCE_VERSION) except (AttributeError, KeyError): pass else: if testCaseVersionNode is not None and testCaseVersionNode.text != testCaseNewVersion: testCaseVersionNode.text = testCaseNewVersion updateXMLfile = True Printer.verbosePrint(f"'{tcName}' is updated due to changes in the test case.", printLevel=Printer.Verbosity.DEBUG) if updateXMLfile: xmlMainTree.write(options.inputFile, pretty_print=True, xml_declaration=True, encoding="UTF-8") else: xmlBaseFolder = os.path.dirname(options.inputFile) for testLevel in options.testLevel: tcNameAndRefDict = {tc.name: tc.referenceTraceSuite for tc in testcaseList if testLevel in tc.testLevel} for testCaseName, testCaseNewVersion in tcNameAndRefDict.items(): xmlFilePath = os.path.join(xmlBaseFolder, constants.DEFAULT_LEVEL_FOLDER_NAME.format(level=testLevel), testCaseName + os.extsep + constants.XML_FILE_ENDING) try: xmlTree = etree.parse(xmlFilePath) except IOError as io_error: raise RuntimeError(f"XML file for test case {testCaseName} cannot be found at the expected path {xmlFilePath}") from io_error root = xmlTree.getroot() try: testCaseVersionNode = root.find(constants.XML_TESTCASE_TAG_REFERENCE_VERSION) except (AttributeError, KeyError): pass else: if testCaseVersionNode is not None and testCaseVersionNode.text != testCaseNewVersion: testCaseVersionNode.text = testCaseNewVersion updateXMLfile = True Printer.verbosePrint("Test level {level}: '{xmlFile}' is updated due to changes in the test cases." .format(level=testLevel, xmlFile=xmlFilePath), printLevel=Printer.Verbosity.DEBUG) xmlTree.write(xmlFilePath, pretty_print=True, xml_declaration=True, encoding="UTF-8") # # Use fileinput here since the write option show unwanted side effects for split files. if updateXMLfile: for line in fileinput.input(options.inputFile, inplace=True): # update SVN revision number line = re.sub(r"<{revTag}>\s*{oldSvnRev}\s*<".format(revTag=constants.XML_SVN_REVISION_TAG_NAME, oldSvnRev=revisionXml), "<{revTag}>{newSvnRev}<".format(revTag=constants.XML_SVN_REVISION_TAG_NAME, newSvnRev=revisionSvn), line.rstrip()) # write lines to file again print(line)
[docs]def updateReferences(testcaseList): """Updates reference data for the test suite. :param testcaseList: list of test cases to update :type testcaseList: list :return: status whether updating has been successful :rtype: Status """ Printer.verbosePrint("Copying reference data...\n", Printer.Verbosity.JENKINS) successStatus = jobConsts.DONE tcErrorList = list() for testcase in testcaseList: if testcase.inactive: continue refSuccess, refErrorText = testcase.updateReferences() if refSuccess: testcase.referenceTraceSuite = testcase.getSoftwareVersion('trace') testcase.errorContainer.set(constants.STATUS_FILE_INFO, constants.STATUS_FILE_INFO_REFERENCE_VERSION, str(testcase.referenceTraceSuite)) testcase.writeStatusFile() else: successStatus = jobConsts.ERROR testcase.errorContainer.set(constants.STATUS_FILE_UPDATE_REFERENCE_DATA, "missing_files", refErrorText) testcase.writeStatusFile() tcErrorList.append(testcase.name) Printer.verbosePrint("Copying reference data done.\n", Printer.Verbosity.JENKINS) return Status(code=successStatus, message=", ".join(tcErrorList))
[docs]def uploadTestcases(svnclient, testcaseList): """Upload the modified test cases to thr svn repository. :param svnclient: pysvn connection :param testcaseList: list of test cases to upload :type svnclient: class :type testcaseList: list :return: success statement :rtype: bool """ svnInfoDict = svnclient.getDictOfSVNInfoForHeadRevision() Printer.verbosePrint("Uploading test case(s)...\n", Printer.Verbosity.JENKINS) testCasePathsToAdd = [] newTestCases = [] for testcase in testcaseList: if not testcase.inactive: if isinstance(testcase, TestSuiteTestCaseSub): if os.path.isdir(os.path.join(testcase.path, TestSuiteTestCaseSub.SOURCE_DIR)) and os.path.isdir(os.path.join(testcase.path, TestSuiteTestCaseSub.REFERENCE_DIR)): if testcase.name in svnInfoDict: if not svnclient.getRemoteRevisionNumber(svnInfoDict[testcase.name]) == svnclient.getLocalRevisionNumberOfPath(testcase.path): Printer.verbosePrint('Head ' + str(svnclient.getRemoteRevisionNumber(svnInfoDict[testcase.name])) + ' != Local ' + str(svnclient.getLocalRevisionNumberOfPath(testcase.path)), Printer.Verbosity.DEBUG) raise RuntimeError('Testcase ' + testcase.name + ' is not in head version. Update testcases before uploading them.') else: testCasePathsToAdd.append(testcase.path) else: Printer.verbosePrint('New Testcase ' + testcase.name + ' will be uploaded!', Printer.Verbosity.DEBUG) newTestCases.append(testcase.path) else: raise RuntimeError("At least one testcase is incomplete (triggered by '%s'). One of the folders '%s' or '%s' is missing. You are only able to upload complete testcases!" % (testcase.name, TestSuiteTestCaseSub.SOURCE_DIR, TestSuiteTestCaseSub.REFERENCE_DIR)) elif isinstance(testcase, (ValidationTestCaseSub, ValidationTestCaseSpeedlineSub)): if os.path.isdir(os.path.join(testcase.path, constants.SOURCE_DIR)): if testcase.name in svnInfoDict: if not svnclient.getRemoteRevisionNumber(svnInfoDict[testcase.name]) == svnclient.getLocalRevisionNumberOfPath(testcase.path): Printer.verbosePrint('Head ' + str(svnclient.getRemoteRevisionNumber(svnInfoDict[testcase.name])) + ' != Local ' + str(svnclient.getLocalRevisionNumberOfPath(testcase.path)), Printer.Verbosity.DEBUG) raise RuntimeError('Testcase ' + testcase.name + ' is not in head version. Update testcases before uploading them.') else: testCasePathsToAdd.append(testcase.path) else: Printer.verbosePrint('New Testcase ' + testcase.name + ' will be uploaded!', Printer.Verbosity.DEBUG) newTestCases.append(testcase.path) else: raise RuntimeError("At least one testcase is incomplete (triggered by '{name}'). The folder '{srcDir}' is missing. You are only able to upload complete testcases!" .format(name=testcase.name, srcDir=constants.SOURCE_DIR)) if newTestCases: svnclient.add(newTestCases, False) if testCasePathsToAdd: svnclient.removeMissingFiles(testCasePathsToAdd) mergedTestCaseList = list(set(testCasePathsToAdd + newTestCases)) # Set correct ignore properties for testcase in mergedTestCaseList: svnclient.propSet(testcase, False, pysvntools.PYSVNClient.IGNORE_TESTCASELEVEL) for topDir in [listDir for listDir in os.listdir(testcase) if listDir in pysvntools.PYSVNClient.IGNORE_DICT]: dirPath = os.path.join(testcase, topDir) for subPath, subDirs, _ in os.walk(dirPath): subDirs[:] = [subList for subList in subDirs if subList not in pysvntools.PYSVNClient.IGNORE_DICT[topDir]] svnclient.add(subPath, False) svnclient.propSet(subPath, True, pysvntools.PYSVNClient.IGNORE_DICT[topDir]) svnclient.add(mergedTestCaseList, True) svnclient.statusOutput(mergedTestCaseList) Printer.verbosePrint("\nCommit message:\n" + svnclient.getLogInMessage().replace("\n", "\n" + " " * 4), Printer.Verbosity.ALWAYS) choice = input("\nCheck files and commit message listed above. Upload? [yes/no] ").lower() isChoicePositive = (choice == 'yes') if isChoicePositive: newCommitRevisionNumber = svnclient.checkIn(mergedTestCaseList) Printer.verbosePrint("\nSuccessfully committed test cases in " + svnclient.url() + " with revision " + str(newCommitRevisionNumber) + ". Don't forget to update the revision number in the xml file", Printer.Verbosity.JENKINS) svnclient.update(svnclient.localDir, depth=pysvntools.PYSVNClient.EMPTY) for testcase in testcaseList: svnclient.update(testcase.path, depth=pysvntools.PYSVNClient.INFINITY) else: Printer.verbosePrint("Upload was cancelled!", Printer.Verbosity.JENKINS) return isChoicePositive
def printJobListRuntimes(job, data=None, indentationLevel=0): data = data if data is not None else {} prefix = " " * indentationLevel if isinstance(job, joblist.JobList): prefix += ":: " print("{:<120s}".format(prefix + job.name)) childData = {} for child in job.successfulJobs + job.failedJobs: data[job.name] = printJobListRuntimes(child, data=childData, indentationLevel=indentationLevel + 1) else: prefix += " - " runtime = job.runtime * job.nProcs print("{:<120s} {:>10}".format(prefix + job.name, getColonDelimitedTimeString(runtime))) data[job.name] = runtime return data
[docs]def createAndRunJobListForComputationPostprocessing(options, executableDict, testcaseList, resourcesDict=None, junitWriter=None): """Executes and manages the computation and/or post processing part. :param options: options instance filled with the arguments from the command line :param executableDict: dictionary of executables :param testcaseList: list of test cases :param resourcesDict: resources available :param junitWriter: JUnit Writer if junit output is desired :type options: options instance :type executableDict: ExecutableResources :type testcaseList: list :type resourcesDict: dict :type junitWriter: JUnitXMLOutput :return: a JobList instance containing all jobs for the steps chosen :rtype: JobList instance """ jobList = createJoblist(options, executableDict, testcaseList, resourcesDict=resourcesDict, postOnly=False) if options.writeFailedJobs: failedJobsJSON = FailedJobsJsonOutput(options.writeFailedJobs) jobList.notifier.registerObserver(failedJobsJSON, JobEvent.JOB_FAILED_EVENT) if options.writeJUnitXML and junitWriter != None: jobList.notifier.registerObserver(junitWriter, JobEvent.JOB_FAILED_EVENT) jobList.notifier.registerObserver(junitWriter, JobEvent.JOB_STARTED_EVENT) jobList.notifier.registerObserver(junitWriter, JobEvent.JOB_SUCCEEDED_EVENT) if options.outputOnError: outputOnError = OutputOnError() jobList.notifier.registerObserver(outputOnError, JobEvent.JOB_FAILED_EVENT) if options.verbose >= Printer.Verbosity.DEBUG: jobList.viewJobs() computationStartTime = time.time() jobList.runJoblist() duration = time.time() - computationStartTime options.configFile.set("computeOptions", "runtime", str(duration)) jobList.cleanup() if options.testCaseRunTimes: print("Test Case Runtimes") print("==================") print("{:<120s} {:>10}".format("Testcase / Joblist", "HH:MM:SS")) runtimeData = printJobListRuntimes(jobList) with open("testCaseTimings.json", "w") as testCaseTimingOutput: print("Writing job runtimes to testCaseTimings.json") json.dump(runtimeData, testCaseTimingOutput, indent=4) # update reference data if options.updateReferenceData and jobList.status.code == jobConsts.DONE: jobList.status = updateReferences(testcaseList) # re-execute post-processing job list if reference data has been updated if jobList.status.code == jobConsts.DONE: postJobList = createJoblist(options, executableDict, testcaseList, resourcesDict=resourcesDict, postOnly=True) if options.writeFailedJobs: postJobList.notifier.registerObserver(failedJobsJSON, JobEvent.JOB_FAILED_EVENT) if options.writeJUnitXML and junitWriter is not None: postJobList.notifier.registerObserver(junitWriter, JobEvent.JOB_FAILED_EVENT) postJobList.notifier.registerObserver(junitWriter, JobEvent.JOB_STARTED_EVENT) postJobList.notifier.registerObserver(junitWriter, JobEvent.JOB_SUCCEEDED_EVENT) if options.verbose >= Printer.Verbosity.DEBUG: postJobList.viewJobs() postJobList.runJoblist() postJobList.cleanup() jobList.status = postJobList.status return jobList
[docs]def manageTestcases(options, svnclient, testcaseList, executableDict, known_error_test_cases, resourcesDict=None, reportSettings=None, suiteName="TEST"): """Executes and manages the computation and/or post processing part. :param options: options instance filled with the arguments from the command line :param svnclient: pysvn connection :param testcaseList: list of test cases :param executableDict: dictionary of executables :param known_error_test_cases: list of test cases with attributes known to fail :param resourcesDict: resources available :param reportSettings: settings for the report :param suiteName: suite name :type options: argparse.Namespace :type svnclient: Optional[pysvntools.PYSVNClient] :type testcaseList: list :type executableDict: ExecutableResources :type known_error_test_cases: dict[str: mojo.pavayo.knownErrorsParser.FailingTestCase] :type resourcesDict: dict :type reportSettings: ReportSub :type suiteName: str :return: error code :rtype: int """ Printer.verbosePrint("\n{0:=^30}".format(" Start Processing "), printLevel=Printer.Verbosity.JENKINS) active_test_cases = [tc for tc in testcaseList if not tc.inactive] if options.download: cleanErrorFile(testcaseList, constants.STATUS_FILE_DOWNLOAD) downloadTestcases(options, svnclient, testcaseList) for testcase in testcaseList: testcase.writeReferenceVersionToStatusFile() else: for testcase in active_test_cases: if not os.path.exists(testcase.path): raise RuntimeError(f"Testcase folder '{testcase.path}' does not exist. Please recheck your setup!") if options.evaluateReferences: for testcase in testcaseList: download_original_references(options.suiteName, testcase, svnclient) synchronizeReferenceVersionInTestcases(active_test_cases) if options.getVersionsFromExecutable: executableDict.writeToJSON(fileName=os.path.join(suiteName, constants.PATH_CONFIG_JSON_FILE)) if options.compute or options.updateRestartData or options.updateMeshData: cleanErrorFile(testcaseList, constants.STATUS_FILE_COMPUTE) cleanErrorFile(testcaseList, constants.STATUS_FILE_POST) cleanErrorFile(testcaseList, constants.STATUS_FILE_CHECK_REFERENCES) elif options.post: cleanErrorFile(testcaseList, constants.STATUS_FILE_POST) cleanErrorFile(testcaseList, constants.STATUS_FILE_CHECK_REFERENCES) elif options.evaluateReferences: cleanErrorFile(testcaseList, constants.STATUS_FILE_EVALUATE_REFERENCES) for testcase in active_test_cases: testcase.writeStatusFile() jobList = None jUnitWriter = None if options.compute or options.post or options.updateRestartData or options.updateMeshData or options.evaluateReferences: if options.writeJUnitXML: jUnitWriter = JUnitXMLOutput(options.writeJUnitXML) jobList = createAndRunJobListForComputationPostprocessing(options, executableDict, testcaseList, resourcesDict=resourcesDict, junitWriter=jUnitWriter) # look for errors in the job list if options.compute: lookForErrorsInAJobList(active_test_cases, jobList, constants.STATUS_FILE_COMPUTE) if options.post: lookForErrorsInAJobList(active_test_cases, jobList, constants.STATUS_FILE_POST) lookForErrorsInAJobList(active_test_cases, jobList, constants.STATUS_FILE_CHECK_REFERENCES) if options.evaluateReferences: lookForErrorsInAJobList(active_test_cases, jobList, constants.STATUS_FILE_EVALUATE_REFERENCES) # check whether any errors are reported in the status files success_state_pavayo = _outputErroneousTestCases(active_test_cases, known_error_test_cases, options, jUnitWriter) if jUnitWriter is not None: Printer.verbosePrint("", printLevel=Printer.Verbosity.ALWAYS) jUnitWriter.save() if options.uploadTestcases: if success_state_pavayo != SuccessState.NEW_FAILURE: return_code_upload = uploadTestcases(svnclient, testcaseList) if not return_code_upload: success_state_pavayo = SuccessState.NEW_FAILURE else: Printer.warning("There were errors in the execution of the test cases, therefore the test case(s) will not be uploaded.", printLevel=Printer.Verbosity.ALWAYS) if options.prepareTraceSuiteXMLforUpload: if success_state_pavayo < SuccessState.EXPECTED_ERROR_MISSING: prepareTraceSuiteXMLforUpload(options, svnclient, testcaseList) else: Printer.warning("There were errors in the execution of pavayo, therefore the test suite XML will not be uploaded.", printLevel=Printer.Verbosity.ALWAYS) if options.latex: if pysvntools is not None: for tc in testcaseList: tc.setRevision(svnclient.getLocalRevisionNumberOfPath(tc.path)) try: tc.isModified(svnclient.modified(tc.path)) except pysvntools.pysvn.ClientError: tc.isModified("No SVN information available.") try: runtime = int(round(float(options.configFile.get("computeOptions", "runtime")))) except (configparser.NoSectionError, configparser.NoOptionError): runtime = 0 reportNames, latexStatus = createReports(options, testcaseList, known_error_test_cases, runtime, svnclient, executableDict, reportSettings=reportSettings) if not latexStatus: Printer.verbosePrint("\nReport(s):\n {0}\nwas/were created.".format("\n ".join(reportNames)), Printer.Verbosity.JENKINS) else: success_state_pavayo = SuccessState.NEW_FAILURE Printer.verbosePrint("", printLevel=Printer.Verbosity.ALWAYS) Printer.warning("Errors occurred while creating the report(s):\n {0}\n(return code = {1})".format("\n ".join(reportNames), latexStatus), printLevel=Printer.Verbosity.ALWAYS) Printer.verbosePrint("\n{0:=^30}".format(" End Processing "), printLevel=Printer.Verbosity.JENKINS) return 1 if success_state_pavayo >= SuccessState.EXPECTED_ERROR_MISSING else 0
def _resetJobIds(testCase): """Every time a new job list is created the ids for the dependencies have to be reseted. :param testCaseList: list of testCases to reset :type testCaseList: list(AbstractTestCaseSub) """ testCase.jobIdsMeshing = list() testCase.jobIdsRestart = list() testCase.jobIdsComputation = list() testCase.jobIdsPostprocessing = list() def _addMeshingJob(options, executableDict, testCase, jobList, resourcesDict=None): """Adds the meshing job of a test case to the given job list. :param options: options instance parsed from the command line :param executableDict: dictionary of executables :param testCase: test case to compute :param jobList: joblist instance :type options: Argparse :type executableDict: ExecutableResources :type testCase: AbstractTestCaseSub :type jobList: JobList """ updateMeshJob = testCase.getMeshingJobList(options, executableDict, resourcesDict=resourcesDict) if updateMeshJob: jobList.addJob(updateMeshJob) testCase.jobIdsMeshing.append(updateMeshJob.id) def _addRestartJob(options, executableDict, testCase, jobList, resourcesDict=None): """Adds the restart job of a test case to the given job list. :param options: options instance parsed from the command line :param executableDict: dictionary of executables :param testCase: test case to compute :param jobList: joblist instance :type options: Argparse :type executableDict: ExecutableResources :type testCase: AbstractTestCaseSub :type jobList: JobList """ updateRestartJob = testCase.getRestartJobList(options, executableDict, resourcesDict=resourcesDict) if updateRestartJob: jobList.addJob(updateRestartJob, parents=testCase.jobIdsMeshing) testCase.jobIdsRestart.append(updateRestartJob.id) def _addComputationJob(options, executableDict, testCase, jobList, resourcesDict=None): """Adds the compute job of a test case to the given job list. :param options: options instance parsed from the command line :param executableDict: dictionary of executables :param testCase: test case to compute :param jobList: joblist instance :type options: Argparse :type executableDict: ExecutableResources :type testCase: AbstractTestCaseSub :type jobList: JobList """ computationJob = testCase.getComputationJobList(options, executableDict, resourcesDict=resourcesDict) if computationJob: jobList.addJob(computationJob, parents=testCase.jobIdsRestart, weight=testCase.priority) testCase.jobIdsComputation.append(computationJob.id) def _addPostprocessingJob(options, executableDict, testCase, jobList, dependentTestCases, postOnly, resourcesDict=None): """Adds the postprocessing job of a test case to the given job list. :param options: options instance parsed from the command line :param executableDict: dictionary of executables :param testCase: test case to compute :param jobList: joblist instance :param dependentTestCases: dictionary of test case dependencies :type options: Argparse :type executableDict: ExecutableResources :type testCase: AbstractTestCaseSub :type jobList: JobList :type dependentTestCases: dict """ postProcessingJob = testCase.getPostprocessingJobList(options, executableDict, resourcesDict=resourcesDict) if postProcessingJob: parents = testCase.jobIdsComputation if not postOnly else [] if isinstance(testCase, SpeedlineTestCaseSub): for tc in dependentTestCases[testCase.name]: parents.extend(tc.jobIdsComputation or tc.jobIdsRestart) jobList.addJob(postProcessingJob, parents) testCase.jobIdsPostprocessing.append(postProcessingJob.id) def _addCheckReferencesJob(options, testCase, jobList): """Adds the reference check job of a test case to the given job list. :param options: options instance parsed from the command line :param testCase: test case to compute :param jobList: joblist instance :type options: Argparse :type testCase: AbstractTestCaseSub :type jobList: JobList """ checkReferencesJob = testCase.getCheckReferencesJobList(options) if checkReferencesJob: parents = testCase.jobIdsPostprocessing jobList.addJob(checkReferencesJob, parents) testCase.jobIdsCheckReferences.append(checkReferencesJob.id)
[docs]def createDependenciesDictionary(tclist): """Creates a dictionary containing the test case names as key and a tuple of the test cases the test case is dependent of as values. :param tclist: list containing test cases :type tclist: list :return: dictionary the test cases mapped to the dependencies :rtype: dictionary """ return {tc.name: [dep for dep in tclist if dep.name in tc.dependency] for tc in tclist}
[docs]def readErrorFiles(testcaseList, errorType): """Reads the error files from the given test cases, updates the error containers and returns a list of test cases which failed in the given step. :param testcaseList: list of test cases to analyze :param errorType: error type to search for :type testcaseList: list :type errorType: string :return: list of erroneous test cases :rtype: list """ erroneousTestcases = list() for testcase in testcaseList: testcase.errorContainer.read(os.path.join(testcase.path, constants.STATUS_FILE_NAME)) if testcase.errorContainer.options(errorType): erroneousTestcases.append(testcase) return erroneousTestcases
def _get_known_error_steps_for_test_case(test_case_name: str, known_error_test_cases: typing.Mapping[str, TestCaseExpectedToFail]) -> set: known_error_steps = set() if test_case_name in known_error_test_cases: known_error_steps = set(known_error_test_cases[test_case_name].steps) return known_error_steps def _all_errors_known(erroneous_steps: typing.Iterable[str], known_error_steps: typing.Iterable[str]) -> bool: return all(step in known_error_steps for step in erroneous_steps) def _is_error_missing(erroneous_steps: typing.Iterable[str], known_error_steps: typing.Iterable[str]) -> bool: return any(step not in erroneous_steps for step in known_error_steps) def _get_test_case_success_state(erroneous_steps: typing.Iterable[str], known_error_steps: typing.Iterable[str]) -> SuccessState: success_state = SuccessState.NO_FAILURE if erroneous_steps and not _all_errors_known(erroneous_steps, known_error_steps): success_state = SuccessState.NEW_FAILURE elif _is_error_missing(erroneous_steps, known_error_steps): success_state = SuccessState.EXPECTED_ERROR_MISSING elif erroneous_steps and _all_errors_known(erroneous_steps, known_error_steps): success_state = SuccessState.ONLY_KNOWN_ERRORS return success_state def _get_active_steps(options: argparse.Namespace) -> set: active_steps = set() if options.download: active_steps.add(constants.STATUS_FILE_DOWNLOAD) if options.compute: active_steps.add(constants.STATUS_FILE_COMPUTE) if options.post: active_steps.add(constants.STATUS_FILE_POST) if (options.post or options.latex) and not options.skipReferenceChecks: active_steps.add(constants.STATUS_FILE_CHECK_REFERENCES) if options.updateReferenceData: active_steps.add(constants.STATUS_FILE_UPDATE_REFERENCE_DATA) if options.evaluateReferences: active_steps.add(constants.STATUS_FILE_EVALUATE_REFERENCES) return active_steps def _outputErroneousTestCases(test_case_list: typing.Iterable[AbstractTestCase], known_error_test_cases: typing.Mapping[str, TestCaseExpectedToFail], options: argparse.Namespace, junit_writer: typing.Optional[JUnitXMLOutput]) -> SuccessState: """Prints all erroneous test cases :param test_case_list: list of test cases to analyze :param known_error_test_cases: list of test cases with attributes known to fail :param options: argparse instance of the current PAVAYO run :param junit_writer: Inject a jUnit writer to pass reference check results :return: return code, """ return_code = SuccessState.NO_FAILURE reference_check_state = TestCaseState.DID_NOT_RUN failed_but_ignored_test_cases = [] failed_test_cases = [] error_missing_test_cases = [] active_steps = _get_active_steps(options) for test_case in test_case_list: known_error_steps = _get_known_error_steps_for_test_case(test_case.name, known_error_test_cases) known_error_steps.intersection_update(active_steps) erroneous_steps = set(test_case.get_erroneous_steps(active_steps)) success_state = _get_test_case_success_state(erroneous_steps, known_error_steps) if success_state == SuccessState.NEW_FAILURE: failed_test_cases.append(test_case) elif success_state == SuccessState.EXPECTED_ERROR_MISSING: error_missing_test_cases.append(test_case.name) elif success_state == SuccessState.ONLY_KNOWN_ERRORS: failed_but_ignored_test_cases.append(test_case.name) if constants.STATUS_FILE_CHECK_REFERENCES in active_steps: test_case_reference_check_state = test_case.get_state_by_step(constants.STATUS_FILE_CHECK_REFERENCES) if test_case_reference_check_state != TestCaseState.DID_NOT_RUN: if junit_writer is not None and test_case.references: junit_writer.submit_check_summary(test_case.name, test_case.checkSummary) if test_case_reference_check_state == TestCaseState.NOT_AVAILABLE: Printer.warning(f"{test_case.name}: No reference check implemented") elif test_case_reference_check_state == TestCaseState.FAILURE: Printer.error(f"{test_case.name}: Reference check failed.") reference_check_state = min(reference_check_state, test_case_reference_check_state) if constants.STATUS_FILE_EVALUATE_REFERENCES in active_steps: test_case_reference_check_state = test_case.get_state_by_step(constants.STATUS_FILE_EVALUATE_REFERENCES) if test_case_reference_check_state == TestCaseState.NOT_AVAILABLE: Printer.warning(f"{test_case.name}: No reference check implemented") elif test_case_reference_check_state == TestCaseState.FAILURE: Printer.error(f"{test_case.name}: Reference evaluation failed.") reference_check_state = min(reference_check_state, test_case_reference_check_state) if reference_check_state == TestCaseState.DID_NOT_RUN: pass elif reference_check_state == TestCaseState.FAILURE: Printer.error("at least one reference check or evaluation failed.", printLevel=Printer.Verbosity.JENKINS) elif reference_check_state == TestCaseState.WITHIN_TOLERANCE: Printer.verbosePrint(Color.colorString("DONE: ", Color.SUCCESS) + "All reference checks and evaluations passed within tolerance.", printLevel=Printer.Verbosity.JENKINS) elif reference_check_state == TestCaseState.SUCCESS: Printer.verbosePrint(Color.colorString("DONE: ", Color.SUCCESS) + "All reference checks and evaluations passed on binary level.", printLevel=Printer.Verbosity.JENKINS) else: assert False, "Unreachable" if failed_but_ignored_test_cases: return_code = SuccessState.ONLY_KNOWN_ERRORS Printer.verbosePrint("\nTest cases with known and therefore ignored failures:", Printer.Verbosity.ALWAYS) for test_case_name in failed_but_ignored_test_cases: Printer.verbosePrint(f"\t{test_case_name} ({known_error_test_cases[test_case_name].issue_url})", Printer.Verbosity.ALWAYS) if error_missing_test_cases: Printer.verbosePrint("\nTest cases that succeeded in at least one step that was expected to fail:", Printer.Verbosity.ALWAYS) for test_case_name in error_missing_test_cases: Printer.verbosePrint( f"\t{test_case_name}\t{known_error_test_cases[test_case_name].steps}\t{known_error_test_cases[test_case_name].issue_url})", Printer.Verbosity.JENKINS) return_code = SuccessState.EXPECTED_ERROR_MISSING if failed_test_cases: Printer.verbosePrint("\nFailed test cases:", Printer.Verbosity.JENKINS) for test_case in failed_test_cases: Printer.verbosePrint(f"\t{test_case.name} (failure in '{test_case.get_erroneous_steps()[0]}' step)", Printer.Verbosity.JENKINS) Printer.verbosePrint("\n\t{}".format(",".join(tc.name for tc in failed_test_cases)), Printer.Verbosity.JENKINS) return_code = SuccessState.NEW_FAILURE return return_code def _prepareLiteratureDatabase(options, latexFolder): """Tests if a path to a literature database was passed through the option parser or if it has to be checked out. Checks out the literature database if necessary. :param options: argparse instance of the current PAVAYO run :param latexFolder: path to the folder in which the TEX files will be generated :type options: argparse.Namespace :type latexFolder: str """ if options.literatureDatabase and os.path.exists(options.literatureDatabase): literatureDatabaseLocalPath = options.literatureDatabase else: literatureDatabaseLocalPath = os.path.join(latexFolder, constants.LITERATURE_DATABASE_NAME) p = subprocess.Popen(['ssh', '-o BatchMode=yes', '-o ConnectTimeout=5', constants.LITERATURE_DATABASE_REPO_URL], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, _ = p.communicate() if constants.GITLAB_WELCOME_STRING in output: with Git().custom_environment(GIT_SSH_COMMAND='ssh'): Repo.clone_from(constants.LITERATURE_DATABASE_REPO_URL + constants.LITERATURE_DATABASE_REPO_PATH, literatureDatabaseLocalPath, branch=constants.LITERATURE_DATABASE_BRANCH) Printer.verbosePrint("Literature data base has been successfully cloned.", Printer.Verbosity.JENKINS) else: Printer.verbosePrint("Cannot log into git@gitlab.dlr.de via ssh to clone the literature data base. Either register your ssh key or specify a local copy via options.\nBuilding latex without literature data base.", Printer.Verbosity.JENKINS) return literatureDatabaseLocalPath def _getReportPathsAndContents(allowList, suiteName, testCaseList, latexFolder, traceVersion): """Generate report tuples for the reports. Every tuple contains the path to the report and a sorted list of test cases which belong to this report. :param allowList: list of additional report targets which should be created :param suiteName: name of the test suite :param testCaseList: list of test cases available :param latexFolder: path to the latex workspace :param traceVersion: version of the TRACE executable used by PAVAYO :type allowList: list(str) :type suiteName: str :type testCaseList: list(AbstractTestCaseSub) :type latexFolder: str :type traceVersion: str :return: a tuple of a list of reports and a flag if the report type is a general report :rtype: list(report), bool """ openTcKey = 'all' isGeneralReport = len([tc for tc in testCaseList if "5" in tc.testLevel]) != len(testCaseList) texFileNameBase = (constants.PAVAYO_REPORT_FILE_BASE if isGeneralReport else constants.VALIDATION_REPORT_FILE_BASE) if traceVersion is not None: texFileNameBase += "_" + traceVersion texFileNameBase += "_" + suiteName texFileName = texFileNameBase + os.extsep + constants.TEX_FILE_ENDING texFilePath = os.path.join(latexFolder, texFileName) reports = [Report(texFilePath, True, sorted(testCaseList, key=lambda tc: tc.public_name.lower()))] testCaseAllowanceDict = {allow: [] for allow in allowList} testCaseAllowanceDict[openTcKey] = [] for testCase in testCaseList: if testCase.confidentiality: for allowance in testCase.confidentiality.allow: if allowance in allowList or allowance == openTcKey: if testCase not in testCaseAllowanceDict[allowance]: testCaseAllowanceDict[allowance].append(testCase) # add test cases which are free for all to every report for testCaseSet in testCaseAllowanceDict.values(): testCaseSet.extend(tc for tc in testCaseAllowanceDict.get(openTcKey, []) if tc not in testCaseSet) if openTcKey not in allowList: del testCaseAllowanceDict[openTcKey] for allowance, testCases in testCaseAllowanceDict.items(): reportNameBase = texFileNameBase + "_" + allowance reportName = reportNameBase + os.extsep + constants.TEX_FILE_ENDING reportPath = os.path.join(latexFolder, reportName) reports.append(Report(reportPath, False, sorted(testCases, key=lambda tc: tc.public_name.lower()))) return reports, isGeneralReport def _addErroneousTestCasesToReport(testCaseList: typing.Iterable[AbstractTestCase], known_error_test_cases, texFileInstance): """Adds a section with erroneous test cases to a report. :param testCaseList: list of all test cases for the current report :param known_error_test_cases: list of test cases with attributes known to fail :param texFileInstance: PyTex instance aka the current report :type testCaseList: Iterable[AbstractTestCase] :type known_error_test_cases: dict[str: mojo.pavayo.knownErrorsParser.FailingTestCase] :type texFileInstance: PyTex """ Printer.verbosePrint("Add section with erroneous test cases.", printLevel=Printer.Verbosity.DEBUG) error_list = [] for test_case in testCaseList: erroneous_steps = test_case.get_erroneous_steps() known_error_steps = _get_known_error_steps_for_test_case(test_case.name, known_error_test_cases) success_state = _get_test_case_success_state(erroneous_steps, known_error_steps) if success_state == SuccessState.NEW_FAILURE: error_list.append(f"\\texttt{{ \\hyperref[{test_case.name}]{{ {PyTex.escapeChars(test_case.public_name)} }} }}") elif success_state == SuccessState.EXPECTED_ERROR_MISSING: issue_url = PyTex.escapeChars(known_error_test_cases[test_case.name].issue_url) suffix = f" (Successful in a step that was expected to fail: \\url{{{issue_url}}})" error_list.append(f"\\texttt{{ \\hyperref[{test_case.name}]{{ {PyTex.escapeChars(test_case.public_name)} }} {suffix} }}") elif success_state == SuccessState.ONLY_KNOWN_ERRORS: suffix = f" (Known failure: \\url{{{PyTex.escapeChars(known_error_test_cases[test_case.name].issue_url)}}})" error_list.append(f"\\texttt{{ \\hyperref[{test_case.name}]{{ {PyTex.escapeChars(test_case.public_name)} }} {suffix} }}") texFileInstance.addSectionTitle("Test Cases which failed to terminate normally") if error_list: texFileInstance.addItemize(error_list) else: texFileInstance.addText("None") def _addTestCaseSummary(testCaseList: typing.Iterable[AbstractTestCase], known_error_test_cases: typing.Mapping[str, TestCaseExpectedToFail], texFileInstance: PyTex): """Adds a section with test case summary. :param testCaseList: list of all test cases for the current report :param known_error_test_cases: list of test cases with attributes known to fail :param texFileInstance: PyTex instance aka the current report """ Printer.verbosePrint("Add test case summary.", printLevel=Printer.Verbosity.DEBUG) table_contents = [["Test case", "Status", "Tolerance", "Binary"]] for testCase in testCaseList: name = f"\\texttt{{\\hyperref[{testCase.name}]{{{PyTex.escapeChars(testCase.name)}}}}}" tolerance = "N/A" binary = "N/A" if testCase.inactive: status = AbstractTestCase.string_inactive else: erroneous_steps = testCase.get_erroneous_steps() known_error_steps = _get_known_error_steps_for_test_case(testCase.name, known_error_test_cases) success_state = _get_test_case_success_state(erroneous_steps, known_error_steps) if success_state == SuccessState.NO_FAILURE: status = AbstractTestCase.string_success elif success_state == SuccessState.NEW_FAILURE: status = AbstractTestCase.string_fail elif success_state == SuccessState.EXPECTED_ERROR_MISSING: issue_url = PyTex.escapeChars(known_error_test_cases[testCase.name].issue_url) status = f"\\href{{{issue_url}}}{{{AbstractTestCase.string_known_errors_missing}}}" elif success_state == SuccessState.ONLY_KNOWN_ERRORS: issue_url = PyTex.escapeChars(known_error_test_cases[testCase.name].issue_url) status = f"\\href{{{issue_url}}}{{{AbstractTestCase.string_only_known_errors}}}" else: raise RuntimeError("Unreachable!") if testCase.checkSummary.checks: binary = AbstractTestCase.string_success if testCase.checkSummary.result.identical else AbstractTestCase.string_fail_orange tolerance = AbstractTestCase.string_success if testCase.checkSummary.result.within_tolerance else AbstractTestCase.string_fail table_contents.append([name, status, tolerance, binary]) texFileInstance.addSectionTitle("Test case summary") texFileInstance.addLongtable(table_contents, nCols=4, setCols="l|lll", headerSeparator="\\hline \\\\") def _addDifferentVersionTestCasesToReport(testCaseList, texFileInstance, executableDict): """Adds a section with test cases computed with a different version to a report. :param testCaseList: list of all active test cases for the current report :param texFileInstance: PyTex instance aka the current report :param executableDict: dictionary of executables :type testCaseList: list(AbstractTestCaseSub) :type texFileInstance: PyTex :type executableDict: ExecutableResources """ if executableDict.mainExecutable: Printer.verbosePrint("Add section with test cases computed with a different version.", printLevel=Printer.Verbosity.DEBUG) main_executable = executableDict.mainExecutable.lower() main_executable_version = str(executableDict[executableDict.mainExecutable].version) tc_name_to_version = {tc.name: tc.getSoftwareVersion(main_executable) for tc in testCaseList} # the test case was not computed at all tc_valid_version = {name: version for name, version in tc_name_to_version.items() if version is not None} tc_diff_version = {name: version for name, version in tc_valid_version.items() if not is_version_str_equal(version, main_executable_version)} texFileInstance.addSectionTitle("Test Cases Computed With A Different TRACE Suite Version") if tc_diff_version: texFileInstance.addItemize(f"\\hyperref[{name}]{{ {PyTex.escapeChars(name):<30} }} Computed with Version {version}" for name, version in tc_diff_version.items()) else: texFileInstance.addText("None") def _addInactiveTestCasesToReport(inactive_test_cases, texFileInstance): """Adds a section with inactive test cases to a report. :param inactive_test_cases: list of all test cases for teh current report :param texFileInstance: PyTex instance aka the current report :type inactive_test_cases: list[AbstractTestCaseSub] :type texFileInstance: PyTex """ Printer.verbosePrint("Add section with inactive test cases.", printLevel=Printer.Verbosity.DEBUG) texFileInstance.addSectionTitle("Inactive Test Cases") if inactive_test_cases: for testCase in inactive_test_cases: public_name = PyTex.escapeChars(testCase.public_name) url = constants.URL_ISSUE.format(id=testCase.inactive) texFileInstance.addText(f"\\label{{{testCase.name}}}\n{public_name}\\\n{url}\n") else: texFileInstance.addText("None") def _add_test_case_error_sections(test_case: AbstractTestCase, known_error_test_cases, tex_file_instance): erroneous_steps = test_case.get_erroneous_steps() known_error_steps = _get_known_error_steps_for_test_case(test_case.name, known_error_test_cases) known_errors = [] new_errors = [] for step in constants.STATUS_FILE_ERROR_SECTIONS: if test_case.errorContainer.items(step): entries_for_step = list(zip(*test_case.errorContainer.items(step)))[1] if step in known_error_steps: known_errors.extend(PyTex.escapeChars(entry) for entry in entries_for_step) else: new_errors.extend(PyTex.escapeChars(entry) for entry in entries_for_step) if known_errors: tex_file_instance.addSectionTitle("Known Errors", chapterType=2) tex_file_instance.addText(f"See issue \\url{{{PyTex.escapeChars(known_error_test_cases[test_case.name].issue_url)}}}:") tex_file_instance.addItemize(known_errors) if new_errors: tex_file_instance.addSectionTitle("Error Messages", chapterType=2) tex_file_instance.addItemize(new_errors) if _is_error_missing(erroneous_steps, known_error_steps): tex_file_instance.addSectionTitle("Error Messages Missing", chapterType=2) tex_file_instance.addText(f"Errors in steps '{tuple(known_error_steps.difference(erroneous_steps))}' were expected but are missing.") tex_file_instance.addText(f"See issue \\url{{{PyTex.escapeChars(known_error_test_cases[test_case.name].issue_url)}}}.") def _addTestCaseChapterToValidationReport(options, testcaseCaseList, known_error_test_cases, texFileInstance, chapterName): """Adds a chapter of test cases to the validation report. Usual chapters are 'Turbine', 'Compressor', 'Combustor' or 'Numerical'. :param options: options from the PAVAYO argument parser :param testCaseList: list of all test cases for teh current report :param known_error_test_cases: list of test cases with attributes known to fail :param texFileInstance: PyTex instance aka the current report :param chapterName: name of the chapter to create :type options: Argparse :type testCaseList: list(AbstractTestCaseSub) :type known_error_test_cases: dict[str: mojo.pavayo.knownErrorsParser.FailingTestCase] :type texFileInstance: PyTex :type chapterName: str """ texFileInstance.addSectionTitle(chapterName) for testCase in testcaseCaseList: texFileInstance.addSectionTitle(PyTex.escapeChars(testCase.public_name), chapterType=1, label=testCase.name) texFileInstance.addText("Author: {0}\n".format(PyTex.escapeChars(testCase.author))) texFileInstance.addText("Mail: {0}\n".format(PyTex.escapeChars(testCase.email))) if testCase.referenceTraceSuite: texFileInstance.addText("Reference TRACE suite: {0}\n".format(PyTex.escapeChars(testCase.referenceTraceSuite))) if not testCase.isErroneous(): testCase.addDescriptionAndCommandline(options, texFileInstance) testCase.addFigures(texFileInstance) _add_test_case_error_sections(testCase, known_error_test_cases, texFileInstance) texFileInstance.addText("\\clearpage") def _initTestSuiteTestCaseParagraph(testCase, texFileInstance, chapterType=0, resetCounters=True, setLabel=True): texFileInstance.addSectionTitle("{0}\n".format(PyTex.escapeChars(testCase.public_name)), chapterType=chapterType, label=testCase.name if setLabel else "") if resetCounters: texFileInstance.addText(r"\setcounter{page}{1}") texFileInstance.addText(r"\setcounter{figure}{0}") texFileInstance.addText(r"\setcounter{table}{0}") texFileInstance.addText("Author: {0}\n".format(PyTex.escapeChars(testCase.author))) texFileInstance.addText("Mail: {0}\n".format(PyTex.escapeChars(testCase.email))) texFileInstance.addText("Test case revision: {0}".format(PyTex.escapeChars(str(testCase.revision)))) if testCase.modified: texFileInstance.addText("({0})".format(PyTex.escapeChars(str(testCase.modified)))) texFileInstance.addText("\n") if testCase.originalReferenceRevision: texFileInstance.addText(f"Original reference revision: {PyTex.escapeChars(testCase.originalReferenceRevision)}\n") if testCase.referenceTraceSuite: texFileInstance.addText("Reference TRACE suite: {0}\n".format(PyTex.escapeChars(testCase.referenceTraceSuite))) def _addTestCaseParagraph(options, testCaseList, known_error_test_cases, texFileInstance, isGeneralReport): """Adds the paragraph containing the test cases with description and pictures to the report. Distinguishes between a general and a validation report. :param options: options from the PAVAYO argument parser :param testCaseList: list of all test cases for the report :param known_error_test_cases: list of test cases with attributes known to fail :param texFileInstance: current report instance :param isGeneralReport: flag if the current report is a general report :type options: Argparse :type testCaseList: list(AbstractTestCaseSub) :type known_error_test_cases: dict[str: TestCaseExpectedToFail] :type texFileInstance: PyTex :type isGeneralReport: bool """ Printer.verbosePrint("Add test cases.", printLevel=Printer.Verbosity.DEBUG) if isGeneralReport: texFileInstance.addSectionTitle("Test cases") for testCase in testCaseList: if testCase.inactive: continue _initTestSuiteTestCaseParagraph(testCase, texFileInstance, chapterType=1) testCase.addDescriptionAndCommandline(options, texFileInstance) _add_test_case_error_sections(testCase, known_error_test_cases, texFileInstance) if options.valgrind or options.includeSanitizers: if options.valgrind: testCase.addValgrindForTex(texFileInstance) if options.includeSanitizers: testCase.addSanitizersForTex(texFileInstance) else: testCase.addFigures(texFileInstance) texFileInstance.addText("\\clearpage\\normalsize") else: turbineTestcases = [tc for tc in testCaseList if "Turbine" in tc.keywords.keyword and not tc.inactive] compressorTestcases = [tc for tc in testCaseList if "Compressor" in tc.keywords.keyword and not tc.inactive] combustorTestcases = [tc for tc in testCaseList if "Combustor" in tc.keywords.keyword and not tc.inactive] numericalTestcases = [tc for tc in testCaseList if tc not in turbineTestcases + compressorTestcases + combustorTestcases and not tc.inactive] if turbineTestcases: _addTestCaseChapterToValidationReport(options, turbineTestcases, known_error_test_cases, texFileInstance, "Turbine test cases") if compressorTestcases: _addTestCaseChapterToValidationReport(options, compressorTestcases, known_error_test_cases, texFileInstance, "Compressor test cases") if combustorTestcases: _addTestCaseChapterToValidationReport(options, combustorTestcases, known_error_test_cases, texFileInstance, "Combustor test cases") if numericalTestcases: _addTestCaseChapterToValidationReport(options, numericalTestcases, known_error_test_cases, texFileInstance, "Numerical test cases") def _addDetailsReferenceChecks(testCaseList, texFileInstance, step, forcePlot): """Adds a section with test case summary. :param testCaseList: list of all test cases for the current report :param texFileInstance: PyTex instance aka the current report :type testCaseList: list[AbstractTestCaseSub] :type texFileInstance: PyTex """ if step == constants.STATUS_FILE_EVALUATE_REFERENCES: section_title = "Evaluation of References" comp_label = "Current" ref_label = "Original" elif step == constants.STATUS_FILE_CHECK_REFERENCES: section_title = "Reference check details" comp_label = "Current" ref_label = "Reference" else: raise NotImplementedError(f"unknown step '{step}'") Printer.verbosePrint(f"Add section of for step '{step}'", printLevel=Printer.Verbosity.DEBUG) texFileInstance.addSectionTitle(section_title, chapterType=0) for testCase in testCaseList: if testCase.references and not testCase.inactive: if step == constants.STATUS_FILE_EVALUATE_REFERENCES or testCase.get_state_by_step(step) == TestCaseState.FAILURE or forcePlot: _initTestSuiteTestCaseParagraph(testCase, texFileInstance, chapterType=1, resetCounters=False, setLabel=False) testCase.addCommandLineForTex(texFileInstance, updateRestart=False, chapterType=2) testCase.addReferenceCheckSummary(texFileInstance, forcePlot, comp_label, ref_label) texFileInstance.addText("\\clearpage\\normalsize") def _addTraceControl(executableDict, texFileInstance): """Adds the traceControl.h to the report. :param executableDict: dictionary of executables :param texFileInstance: report instance :type executableDict: ExecutableResources :type texFileInstance: PyTex """ if constants.TRACE_SUITE_TRACE_EXEC in executableDict.resourceDict: Printer.verbosePrint("Add TRACE suite control settings.", printLevel=Printer.Verbosity.DEBUG) try: execPath, _ = os.path.split(os.path.realpath(executableDict[constants.TRACE_SUITE_TRACE_EXEC].path)) texFileInstance.addFileContent(execPath + '/traceControl.h', sectionName="Control settings", chapterType=0, fontsize=7.5, appendix=True) except (OSError, IOError): Printer.verbosePrint("Unable to include 'traceControl.h' in the PDF-file.", printLevel=Printer.Verbosity.ALWAYS)
[docs]def createReports(options, testCaseList, known_error_test_cases, runTime, svnclient, executableDict, reportSettings=None): """Creates the latex reports for a computation and returns the name of the report and status. :param options: options parsed from the command line :param testCaseList: list of test cases :param known_error_test_cases: list of test cases with attributes known to fail :param runTime: runtime of the computation :param svnclient: pysvn connection :param executableDict: dictionary of executables :param reportSettings: settings for the report :type options: options instance :type testCaseList: list :type known_error_test_cases: dict[str: mojo.pavayo.knownErrorsParser.FailingTestCase] :type runTime: int :type svnclient: Optional[pysvntools.PYSVNClient] :type executableDict: ExecutableResources :type reportSettings: ReportSub :return: name of reports, return code of LaTeX compilation :rtype: tuple(list(str), int) """ Printer.verbosePrint("\nCreating reports...", Printer.Verbosity.JENKINS) if reportSettings is None: reportSettings = ReportSub.getValuesFromDefault() def UseInReport(tc): return isinstance(tc, GeneralTestCaseSub) or isinstance(tc, TestSuiteTestCaseSub) \ or isinstance(tc, ValidationTestCaseSpeedlineSub) or isinstance(tc, ValidationTestCaseSub) \ or tc.postScript testCaseList = [tc for tc in testCaseList if UseInReport(tc)] latexStatus = 0 reportNames = [] softwareVersionDict = getSoftwaresVersionNumbers(options, executableDict) if constants.TRACE_SUITE_TRACE_EXEC.lower() + (constants.TRACE_SUITE_DEBUG_NAME if options.useDebugExecs else "") in softwareVersionDict: traceVersion = softwareVersionDict[constants.TRACE_SUITE_TRACE_EXEC.lower() + (constants.TRACE_SUITE_DEBUG_NAME if options.useDebugExecs else "")] else: traceVersion = None latexFolder = os.path.join(options.suiteName, constants.LATEX_FOLDER) removeFile(latexFolder) md(latexFolder) if not options.unusedLiteratureDatabase: literatureDatabasePath = _prepareLiteratureDatabase(options, latexFolder) literatureDatabasePath = os.path.join(literatureDatabasePath, constants.DEFAULT_BIB) reports, isGeneralReport = _getReportPathsAndContents(options.releaseReports, options.suiteName, testCaseList, latexFolder, traceVersion) Printer.verbosePrint("Building and compiling TEX files...") for report in reports: texFileInstance = PyTex(defaultSettings="pavayo" if isGeneralReport else "validation") texFileInstance.setfileName(report.path) if not options.unusedLiteratureDatabase: texFileInstance.setBibtex(literatureDatabasePath) if isGeneralReport: userName = os.environ.get("LOGNAME", "-") hostName = socket.gethostname() svnRepoPath = svnclient.url() if svnclient is not None else "Not available." svnRevision = svnclient.revisionOfPYSVNClient() if svnclient is not None else "-" texFileInstance.setTitle(titleType="user", content=createGeneralTitlePageContent(executableDict, options.suiteName, userName, hostName, svnRepoPath=svnRepoPath, svnRevision=svnRevision, runTime=getHoursMinutesSeconds(runTime))) else: texFileInstance.setTitle(titleType="user", content=createValidationTitlePageContent(executableDict, options.basePathDict, reportSettings=reportSettings)) texFileInstance.addOptionalSettings(fr"\graphicspath{{{{{options.suiteName}/}}}}") if options.updateReferenceData: texFileInstance.addWatermark(text="UPDATED REFERENCES! DO NOT CHECK IN!") # add aliases for the version numbers for software, version in softwareVersionDict.items(): texFileInstance.addOptionalSettings(fr"\newcommand{{\{software}Version}}{{{version}}}") if not isGeneralReport: texFileInstance.addText(constants.VALIDATION_REPORT_INTRODUCTION) # show errors, different versions and inactive test cases in the internal report only if report.showDebug: active_test_cases = [tc for tc in report.testCaseList if not tc.inactive] inactive_test_cases = [tc for tc in report.testCaseList if tc.inactive] _addErroneousTestCasesToReport(active_test_cases, known_error_test_cases, texFileInstance) _addDifferentVersionTestCasesToReport(active_test_cases, texFileInstance, executableDict) _addInactiveTestCasesToReport(inactive_test_cases, texFileInstance) _addTestCaseParagraph(options, report.testCaseList, known_error_test_cases, texFileInstance, isGeneralReport) _addTestCaseSummary(report.testCaseList, known_error_test_cases, texFileInstance) if not options.skipReferenceChecks: _addDetailsReferenceChecks(report.testCaseList, texFileInstance, constants.STATUS_FILE_CHECK_REFERENCES, options.forceReferencePlots) if options.evaluateReferences: _addDetailsReferenceChecks(report.testCaseList, texFileInstance, constants.STATUS_FILE_EVALUATE_REFERENCES, forcePlot=True) if isGeneralReport: _addTraceControl(executableDict, texFileInstance) texFileInstance.writeLatexFile() latexStatus = texFileInstance.compileLatexFile(cplType="pdf") or latexStatus reportNames.append("{baseName}{sep}{ending}".format(baseName=os.path.splitext(os.path.abspath(report.path))[0], sep=os.extsep, ending="pdf")) return reportNames, latexStatus