"""
MethodJob
=========
MethodJob supplies a class to create a job which executes a given method in the currently running
python enviroment. To get the results of the method you have to pass a queue or something similar to send
data between the processes.
"""
import os
import sys
import multiprocessing
import traceback
from ..management.resources import PROCESSORS
from ...jobManagement.jobManagementData import DEFAULT_WEIGHT_METHOD
from ..jobs.abstractJob import AbstractJob
from ...bricabrac import fileIO
class _ProcessWrapper(multiprocessing.Process):
"""Extends the functionality of multiprocessing.Process. Adds redirection of the output, a working directory
update of the environment variables, saves the return value and handles the errors.
:param method: method which will be executed
:param args: method arguments
:param kwargs: method keyword arguments
:param workingDir: working directory for the job
:param outFile: output file of this job
:param envVars: dictionary with additional environment variables
:param inJoblist: parameter if the job is executed in a job list
:type method: method
:type args: list
:type kwargs: dict
:type workingDir: string
:type outFile: string
:type envVars: dictionary
:type inJoblist: boolean
"""
def __init__(self, method, args, kwargs, workingDir, outFile, envVars, inJoblist):
super(_ProcessWrapper, self).__init__(name=method.__name__)
self.__method = method
self.__args = args
self.__kwargs = kwargs
self.__workingDir = workingDir
self.__outFile = outFile
self.__envVars = envVars
self.__inJoblist = inJoblist
def run(self):
"""Method called in a new process.
"""
try:
with fileIO.workingDirectory(self.__workingDir):
os.environ.update(self.__envVars)
if self.__inJoblist:
with open(self.__outFile, "a") as fileHandle:
sys.stdout = fileHandle # redirects every output in this thread to the output file
self.__method(*self.__args, **self.__kwargs)
else:
self.__method(*self.__args, **self.__kwargs)
except BaseException: # catch everything, because we cannot know what the method we execute does
exc_type, exc_value, exc_traceback = sys.exc_info()
with open(self.__outFile, "a") as fileHandle:
traceback.print_exception(exc_type, exc_value, exc_traceback,
limit=None, file=fileHandle)
if not self.__inJoblist:
traceback.print_exception(exc_type, exc_value, exc_traceback,
limit=None, file=sys.stdout)
finally:
sys.stdout = sys.__stdout__
[docs]class MethodJob(AbstractJob):
"""Class to execute a given method with the given arguments.
:param method: method to execute
:param args: arguments which will be appended when executing
:param kwargs: a variable to pass keyword arguments to a methodJob
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param workingDirectory: directory to change to before executing
:param outFile: path to a file for the output
:param jobName: name for the job
:param envVars: environment variables to set
:param nodeType: type of the node to calculate on
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param outputDir: directory the output file will be written to
:param runInBackground: If true the job will run in the background, when executed as single job
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param weight: the weight (seconds to run times number of processes) of the job
:param retries: number of times the job is retried when failing
:type method: method
:type args: list
:type kwargs: dictionary
:tpye nTecplotLicenses: integer
:type workingDirectory: string
:type outFile: string
:type jobName: string
:type envVars: dictionary
:type nodeType: string
:type successStatements: list
:type failStatements: list
:type outputDir: string
:type runInBackground: boolean
:type ignoreExitCode: boolean
:type resourcesDict: dictionary
:type weight: int
:type retries: int
"""
_jsonAttributes = ("name", "workingDir", "methodName", "overwritingEnvVars", "nProcs", "outputFile")
def __init__(self, method, args=None, kwargs=None, nTecplotLicenses=0, name=None, workingDir=None,
outFile=None, envVars=None, successStatements=None, outputDir=None,
failStatements=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, weight=DEFAULT_WEIGHT_METHOD, retries=0):
"""Initializes a new instance, saves the parameters as instance variables
:param method: method to execute
:param args: arguments which will be appended when executing
:param kwargs: a variable to pass keyword arguments to a methodJob
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param workingDirectory: directory to change to before executing
:param outFile: path to a file for the output
:param jobName: name for the job
:param envVars: environment variables to set
:param nodeType: type of the node to calculate on
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param outputDir: directory the output file will be written to
:param runInBackground: If true the job will run in the background, when executed as single job
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param weight: the weight (seconds to run times number of processes) of the job
:param retries: number of times the job is retried when failing
:type method: method
:type args: list
:type kwargs: dictionary
:tpye nTecplotLicenses: integer
:type workingDirectory: string
:type outFile: string
:type jobName: string
:type envVars: dictionary
:type nodeType: string
:type successStatements: list
:type failStatements: list
:type outputDir: string
:type runInBackground: boolean
:type ignoreExitCode: boolean
:type resourcesDict: dictionary
:type weight: int
:int retries: int
"""
super(MethodJob, self).__init__(name=name, nTecplotLicenses=nTecplotLicenses, outFile=outFile, outputDir=outputDir,
successStatements=successStatements, failStatements=failStatements, workingDir=workingDir, runInBackground=runInBackground,
ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, weight=weight, retries=retries)
self._method = method
self.__args = args or list()
self.__kwargs = kwargs or dict()
self.__envVars = envVars or dict()
self.__process = None
self._nProcs = 1
self._resources.update({PROCESSORS: self._nProcs})
@property
def methodName(self):
return self._method.__name__
@property
def args(self):
return self.__args
@property
def kwargs(self):
return self.__kwargs
@property
def overwritingEnvVars(self):
return self.__envVars
[docs] def runJob(self):
"""Executes the given method with the given parameters.
"""
self.prepareOutFile()
self.__process = _ProcessWrapper(self._method, self.__args, self.__kwargs, self._workingDir, self.outFile, self.__envVars, self._inJoblist)
self.__process.start()
if not self._inJoblist:
self.waitUntilFinished()
[docs] def updateStatus(self):
"""Updates the status of the job and returns True if the status has changed False else.
:return: Returns True if the status has changed
:rtype: boolean
"""
if not self.__process.is_alive():
self.lookForSuccess()
self.__process.close()
return True
else:
return False
[docs] def stop(self):
"""Terminates its own process and writes a comment to the output file.
"""
self.__process.terminate()
self.__process.join(1)
self.__process.close()
with open(self.outFile, "a") as fileHandle:
fileHandle.write("Job aborted!")