Source code for mojo.jobManagement.jobs.shellJob

"""
shellJob
========
ShellJob supplies a class to execute a bash script in the current shell. You have to pass
a executable and can pass arguments for the executable, a name, a file the output will be saved in,
a working directory and environment variables to be set before the execution.
"""


import os
import shlex
import subprocess

from ...jobManagement import jobManagementData as jobConsts
from .abstractJob import AbstractJob
from ..management.resources import PROCESSORS


[docs]class ShellJob(AbstractJob): """Class which executes a shell script in a new process. Name and output path will be generated when not given. :param executable: path to the executable :param args: arguments which will be appended when executing :param nTecplotLicenses: number of tecplot licenses needed by the job :param name: name for the job :param outFile: path to a file for the output :param workingDir: directory to change to before executing :param envVars: environment variables to set :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: True, if mpirun should used to start the executable, false otherwise :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :type executable: string :type args: list of string :tpye nTecplotLicenses: integer :type name: string :type outFile: string :type workingDir: string :type envVars: dictionary :type successStatements: list :type failStatements: list :type outputDir: string :type runInBackground: boolean :type resourcesDict: dictionary :type useMpirun: boolean :type weight: int :type retries: int Methods in use inherited from AbstractJob:: _writeStatus(status, message) """ _jsonAttributes = ("name", "workingDir", "executable", "args", "overwritingEnvVars", "nProcs", "nThreadsPerProc", "nThreads", "outputFile") def __init__(self, executable, args=None, nTecplotLicenses=0, nProcs=None, name=None, outFile=None, workingDir=None, envVars=None, successStatements=None, failStatements=None, outputDir=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, useMpirun=False, weight=jobConsts.DEFAULT_WEIGHT_SHELL, retries=0, threadsPerProc=None): """Initializes a new instance... :param executable: path to the executable :param args: arguments which will be appended when executing :param nTecplotLicenses: number of tecplot licenses needed by the job :param name: name for the job :param outFile: path to a file for the output :param workingDir: directory to change to before executing :param envVars: environment variables to set :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: True, if mpirun should used to start the executable, false otherwise :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :param threadsPerProc: threads to start per processor :type executable: string :type args: list of string :tpye nTecplotLicenses: integer :type name: string :type outFile: string :type workingDir: string :type envVars: dictionary :type successStatements: list :type failStatements: list :type outputDir: string :type runInBackground: boolean :type ignoreExitCode: boolean :type resourcesDict: dictionary :type useMpirun: boolean :type weight: int :type retries: int :type threadsPerProc: int """ if not name: name = os.path.split(executable)[1] # name of the executable super(ShellJob, self).__init__(name=name, nTecplotLicenses=nTecplotLicenses, outFile=outFile, outputDir=outputDir, failStatements=failStatements, successStatements=successStatements, workingDir=workingDir, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, weight=weight, retries=retries) try: self._executable = os.path.join(os.curdir, executable) if executable in os.listdir(self._workingDir) else executable except OSError: self._executable = executable self._nProcs = nProcs or 1 self._nThreadsPerProc = threadsPerProc or 1 self._resources.update({PROCESSORS: self._nProcs * self._nThreadsPerProc}) self._args = args or list() self._envVars = envVars or dict() self.__process = None self._useMpirun = useMpirun @property def executable(self): return self._executable @property def args(self): return self._args @property def overwritingEnvVars(self): return self._envVars @property def nThreadsPerProc(self): return self._nThreadsPerProc @property def nThreads(self): return self._nProcs * self._nThreadsPerProc
[docs] def runJob(self): """Executes the given executable with the given parameters in the given working directory. """ self.prepareOutFile() try: envVars = os.environ.copy() envVars.update(self._envVars) commandline = self._generateExecutionCommand() if self._inJoblist or self._writeToFile: with open(self.outFile, "w") as outputHandle: self.__process = subprocess.Popen(commandline, cwd=self._workingDir, env=envVars, stdout=outputHandle, stderr=outputHandle) else: self.__process = subprocess.Popen(commandline, cwd=self._workingDir, env=envVars) if not self._inJoblist: self.waitUntilFinished() except IOError: self._writeStatus(jobConsts.ERROR, f"Cannot open output handle '{self._outFile}'")
[docs] def updateStatus(self): """Updates the status of the job and returns True if the status has changed False else. :return: Returns True if the status has changed :rtype: boolean """ if self.__process.poll() is not None: self.lookForSuccess(self.__process.poll()) return True else: return False
[docs] def stop(self): """Sets a flag to abort the job in run """ self.__process.terminate() try: self.__process.wait(1) except subprocess.TimeoutExpired: self.__process.kill() with open(self.outFile, "a") as fileHandle: fileHandle.write("Job aborted!")
def _generateExecutionCommand(self): """Generates the command to execute the job by joining the executable and the arguments. """ arguments = " ".join(self._args) if self._useMpirun: commandline = f"{jobConsts.MPIRUN} -np {self._nProcs} {self._executable} {arguments}" else: commandline = f"{self._executable} {arguments}" try: return shlex.split(commandline) except ValueError: return commandline.split()