Source code for

ShellJob supplies a class to execute a bash script in the current shell. You have to pass
a executable and can pass arguments for the executable, a name, a file the output will be saved in,
a working directory and environment variables to be set before the execution.

import os
import shlex
import subprocess

from ...jobManagement import jobManagementData as jobConsts
from .abstractJob import AbstractJob
from import PROCESSORS

[docs]class ShellJob(AbstractJob): """Class which executes a shell script in a new process. Name and output path will be generated when not given. :param executable: path to the executable :param args: arguments which will be appended when executing :param nTecplotLicenses: number of tecplot licenses needed by the job :param name: name for the job :param outFile: path to a file for the output :param workingDir: directory to change to before executing :param envVars: environment variables to set :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: True, if mpirun should used to start the executable, false otherwise :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :type executable: string :type args: list of string :tpye nTecplotLicenses: integer :type name: string :type outFile: string :type workingDir: string :type envVars: dictionary :type successStatements: list :type failStatements: list :type outputDir: string :type runInBackground: boolean :type resourcesDict: dictionary :type useMpirun: boolean :type weight: int :type retries: int Methods in use inherited from AbstractJob:: _writeStatus(status, message) """ _jsonAttributes = ("name", "workingDir", "executable", "args", "overwritingEnvVars", "nProcs", "nThreadsPerProc", "nThreads", "outputFile") def __init__(self, executable, args=None, nTecplotLicenses=0, nProcs=None, name=None, outFile=None, workingDir=None, envVars=None, successStatements=None, failStatements=None, outputDir=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, useMpirun=False, weight=jobConsts.DEFAULT_WEIGHT_SHELL, retries=0, threadsPerProc=None): """Initializes a new instance... :param executable: path to the executable :param args: arguments which will be appended when executing :param nTecplotLicenses: number of tecplot licenses needed by the job :param name: name for the job :param outFile: path to a file for the output :param workingDir: directory to change to before executing :param envVars: environment variables to set :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param useMpirun: True, if mpirun should used to start the executable, false otherwise :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :param threadsPerProc: threads to start per processor :type executable: string :type args: list of string :tpye nTecplotLicenses: integer :type name: string :type outFile: string :type workingDir: string :type envVars: dictionary :type successStatements: list :type failStatements: list :type outputDir: string :type runInBackground: boolean :type ignoreExitCode: boolean :type resourcesDict: dictionary :type useMpirun: boolean :type weight: int :type retries: int :type threadsPerProc: int """ if not name: name = os.path.split(executable)[1] # name of the executable super(ShellJob, self).__init__(name=name, nTecplotLicenses=nTecplotLicenses, outFile=outFile, outputDir=outputDir, failStatements=failStatements, successStatements=successStatements, workingDir=workingDir, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, weight=weight, retries=retries) try: self._executable = os.path.join(os.curdir, executable) if executable in os.listdir(self._workingDir) else executable except OSError: self._executable = executable self._nProcs = nProcs or 1 self._nThreadsPerProc = threadsPerProc or 1 self._resources.update({PROCESSORS: self._nProcs * self._nThreadsPerProc}) self._args = args or list() self._envVars = envVars or dict() self.__process = None self._useMpirun = useMpirun @property def executable(self): return self._executable @property def args(self): return self._args @property def overwritingEnvVars(self): return self._envVars @property def nThreadsPerProc(self): return self._nThreadsPerProc @property def nThreads(self): return self._nProcs * self._nThreadsPerProc
[docs] def runJob(self): """Executes the given executable with the given parameters in the given working directory. """ self.prepareOutFile() try: envVars = os.environ.copy() envVars.update(self._envVars) commandline = self._generateExecutionCommand() if self._inJoblist or self._writeToFile: with open(self.outFile, "w") as outputHandle: self.__process = subprocess.Popen(commandline, cwd=self._workingDir, env=envVars, stdout=outputHandle, stderr=outputHandle) else: self.__process = subprocess.Popen(commandline, cwd=self._workingDir, env=envVars) if not self._inJoblist: self.waitUntilFinished() except IOError: self._writeStatus(jobConsts.ERROR, f"Cannot open output handle '{self._outFile}'")
[docs] def updateStatus(self): """Updates the status of the job and returns True if the status has changed False else. :return: Returns True if the status has changed :rtype: boolean """ if self.__process.poll() is not None: self.lookForSuccess(self.__process.poll()) return True else: return False
[docs] def stop(self): """Sets a flag to abort the job in run """ self.__process.terminate() try: self.__process.wait(1) except subprocess.TimeoutExpired: self.__process.kill() with open(self.outFile, "a") as fileHandle: fileHandle.write("Job aborted!")
def _generateExecutionCommand(self): """Generates the command to execute the job by joining the executable and the arguments. """ arguments = " ".join(self._args) if self._useMpirun: commandline = f"{jobConsts.MPIRUN} -np {self._nProcs} {self._executable} {arguments}" else: commandline = f"{self._executable} {arguments}" try: return shlex.split(commandline) except ValueError: return commandline.split()