Source code for mojo.pavayo.executables.executableResources

"""
List of executables
====================

In the class ExecutableResources all relevant executables are stored. Using this class Pavayo becomes independent of the settings
for the TRACE Suite.
"""

import glob
import os
import typing

from .singleExecutableResource import SingleExecutableResource

from ...bricabrac.textIO import convertToJSON
from ...bricabrac.fileIO import Printer

from ..testcase import testcase_generated as supermod


[docs]class ExecutableResources: """Class to manage a set of executables """ def __init__(self): """Initiate executable resource. """ self._mainExecutable = "" self._resourceDict = dict() @property def mainExecutable(self): return self._mainExecutable @mainExecutable.setter def mainExecutable(self, name): name = name.upper() if name == "TRACE" or name in self.resourceDict: self._mainExecutable = name elif name == "" and self.resourceDict: self._mainExecutable = "" else: raise ValueError(f"Invaild name '{name}' for main executable, not in available executable list") @property def resourceDict(self): return self._resourceDict
[docs] @staticmethod def getKeyAndEnvironmentName(executableName): """Get key and environment variable name from name of executable :param executableName: name of executable :type executableName: str :return: key to executable in upper case, name for environment variable in upper case :rtype: str, str """ if executableName.startswith("$"): execKey = executableName[1:] else: execKey = executableName executableName = f"${executableName}" return execKey.upper(), executableName.upper()
[docs] def addMultipleExecutables(self, searchPath, environmentName="", commandLineArguments=None, numberOfLicenses=None, maxProcsPerExecution=None, displayVersion=False, successStatement=None, failStatement=None, ignoreExitCode=False, versionCheck=True, replaceSuffix=''): """Add one or more executables to the dictionary by searching a given path. :param searchPath: path where to search for executables :param environmentName: environment name :param commandLineArguments: additional command line arguments :param numberOfLicenses: number of available licenses :param maxProcsPerExecution: maximum number of processors per execution :param displayVersion: flag whether executable is shown in screen output and report :param successStatement: list of success statements :param failStatement: list of fail statements :param ignoreExitCode: flag whether the exit code will be ignored :param versionCheck: flag whether version should be checked :param replaceSuffix: suffix to be removed from the end of the executable name :type searchPath: str :type environmentName: str :type commandLineArguments: list(str) or None :type numberOfLicenses: int or None :type maxProcsPerExecution: int or None :type displayVersion: bool :type successStatement: list or None :type failStatement: list or None :type ignoreExitCode: bool :type versionCheck: bool :type replaceSuffix: str """ foundExecutables = glob.glob(searchPath) environmentName = environmentName if environmentName and len(foundExecutables) == 1 else None for path in foundExecutables: currentEnvironmentName = environmentName or os.path.basename(path) if replaceSuffix and currentEnvironmentName.endswith(replaceSuffix): currentEnvironmentName.replace(replaceSuffix, '') self.addExecutable(environmentName=currentEnvironmentName, path=os.path.abspath(path), commandLineArguments=commandLineArguments, numberOfLicenses=numberOfLicenses, maxProcsPerExecution=maxProcsPerExecution, displayVersion=displayVersion, successStatement=successStatement, failStatement=failStatement, ignoreExitCode=ignoreExitCode, versionCheck=versionCheck)
[docs] def addExecutable(self, environmentName="", path="", commandLineArguments=None, numberOfLicenses=None, maxProcsPerExecution=None, displayVersion=False, successStatement=None, failStatement=None, ignoreExitCode=False, versionCheck=True, versionCheckCommand=None, explicitlyRequested=False): """Add a single executable to the dictionary. :param environmentName: environment name :param path: path to executable :param commandLineArguments: additional command line arguments :param numberOfLicenses: number of available licenses :param maxProcsPerExecution: maximum number of processors per execution :param displayVersion: flag whether executable is shown in screen output and report :param successStatement: list of success statements :param failStatement: list of fail statements :param ignoreExitCode: flag whether the exit code will be ignored :param versionCheck: flag whether version should be checked :param versionCheckCommand: command line argument for version check :param explicitlyRequested: command was requested explicitly, this may enforces stricter checks :type environmentName: str :type path: str :type commandLineArguments: list(str) or None :type numberOfLicenses: int or None :type maxProcsPerExecution: int or None :type displayVersion: bool :type successStatement: list or None :type failStatement: list or None :type ignoreExitCode: bool :type versionCheck: bool :type versionCheckCommand: str or None :type explicitlyRequested: bool """ execKey, environmentName = ExecutableResources.getKeyAndEnvironmentName(environmentName) if os.path.isfile(str(path)) and not os.access(path, os.X_OK): # The file does exist, but it is not an executable assert explicitlyRequested, f"Cannot add requested {path} to executable dict: is not marked executable" Printer.verbosePrint(f"Do not add non executable file {path} to ExecutableResources", Printer.Verbosity.DEBUG) return self._resourceDict[execKey] = SingleExecutableResource(environmentName=environmentName, path=path, commandLineArguments=commandLineArguments, numberOfLicenses=numberOfLicenses, maxProcsPerExecution=maxProcsPerExecution, displayVersion=displayVersion, successStatement=successStatement, failStatement=failStatement, ignoreExitCode=ignoreExitCode, versionCheck=versionCheck, versionCheckCommand=versionCheckCommand)
[docs] def updateDictFromXML(self, xmlListOfExecutables: supermod.ListOfExecutables, basePathDict: typing.Mapping[str, str], replaceSuffix: str = ""): """Fills the resource dictionary using the resource fabric class. Replaces -1 with sys.maxint for easier handling in the other methods of this class. :param xmlListOfExecutables: a dictionary containing names and number of resources :param basePathDict: base path dictionary :param replaceSuffix: suffix to be removed from the end of the executable name """ for executableEntry in xmlListOfExecutables.executable: if executableEntry.path.type_ == "general": path = executableEntry.path.valueOf_ self.addExecutable(environmentName=executableEntry.environmentName or path, path=path, commandLineArguments=executableEntry.commandLineArguments, numberOfLicenses=executableEntry.numberOfLicenses, maxProcsPerExecution=executableEntry.maximumNumberOfProcsPerJob, displayVersion=executableEntry.displayVersion, successStatement=executableEntry.successStatement, failStatement=executableEntry.failStatement, ignoreExitCode=executableEntry.ignoreExitCode, versionCheck=not executableEntry.noCheck) else: if executableEntry.path.type_ == "relative": search_path = os.path.join(basePathDict.get(executableEntry.path.basePath, executableEntry.path.basePath), executableEntry.path.valueOf_) else: search_path = executableEntry.path.valueOf_ self.addMultipleExecutables(search_path, environmentName=executableEntry.environmentName, commandLineArguments=executableEntry.commandLineArguments, numberOfLicenses=executableEntry.numberOfLicenses, maxProcsPerExecution=executableEntry.maximumNumberOfProcsPerJob, displayVersion=executableEntry.displayVersion, successStatement=executableEntry.successStatement, failStatement=executableEntry.failStatement, ignoreExitCode=executableEntry.ignoreExitCode, versionCheck=not executableEntry.noVersionCheck, replaceSuffix=replaceSuffix)
[docs] def getVersion(self, options=None): """Determines the versions of all executables in the resource dictionary :param options: options parsed from the command line :type options: None or argParse.ArgumentParser """ for executable in self.resourceDict.values(): executable.getVersion(options=options)
[docs] def writeToJSON(self, fileName="executables.cfg"): """Writes the executables to a JSON file :param fileName: file name :type fileName: str """ with open(fileName, "w") as fh: fh.write(convertToJSON(self))
def __str__(self): """Returns the string output in a neat form :return: neat output :rtype: str """ outputText = list() outputText.append("resource dictionary\n") for i, entry in enumerate(self.resourceDict): outputText.append("{} - {}\n{}\n".format(i + 1, entry, self.resourceDict[entry])) return "\n".join(outputText) def __len__(self): """Returns the number of operation point lists in the test case. :return: number of operation point lists :rtype: int """ return len(self.resourceDict) def __iter__(self): """Uses resource dictionary for iteration """ return iter(self.resourceDict.items()) def __getitem__(self, name): """Returns a operation point list at a given index. :param name: name of executable :type name: str :return: executable :rtype: SingleExecutableResource """ if name.startswith("$"): name = name[1:] return self.resourceDict[name.upper()] def __contains__(self, name): """Returns whether the executable is in the resource dictionary :param name: name of executable :type name: str :return: flag whether executable is in the resource dictionary :rtype: bool """ if name.startswith("$"): name = name[1:] return name in self.resourceDict def __delitem__(self, name): """Deletes executable from resource dictionary :param name: name of executable :type name: str """ if name.startswith("$"): name = name[1:] del self.resourceDict[name.upper()]