Source code for mojo.jobManagement.jobs.methodJob

"""
MethodJob
=========
MethodJob supplies a class to create a job which executes a given method in the currently running
python enviroment. To get the results of the method you have to pass a queue or something similar to send
data between the processes.
"""


import os
import sys
import multiprocessing
import traceback

from ..management.resources import PROCESSORS
from ...jobManagement.jobManagementData import DEFAULT_WEIGHT_METHOD
from ..jobs.abstractJob import AbstractJob
from ...bricabrac import fileIO


class _ProcessWrapper(multiprocessing.Process):
    """Extends the functionality of multiprocessing.Process. Adds redirection of the output, a working directory
    update of the environment variables, saves the return value and handles the errors.

    :param method:              method which will be executed
    :param args:                method arguments
    :param kwargs:              method keyword arguments
    :param workingDir:          working directory for the job
    :param outFile:             output file of this job
    :param envVars:             dictionary with additional environment variables
    :param inJoblist:           parameter if the job is executed in a job list
    :type method:               method
    :type args:                 list
    :type kwargs:               dict
    :type workingDir:           string
    :type outFile:              string
    :type envVars:              dictionary
    :type inJoblist:            boolean
    """

    def __init__(self, method, args, kwargs, workingDir, outFile, envVars, inJoblist):
        super(_ProcessWrapper, self).__init__(name=method.__name__)
        self.__method = method
        self.__args = args
        self.__kwargs = kwargs
        self.__workingDir = workingDir
        self.__outFile = outFile
        self.__envVars = envVars
        self.__inJoblist = inJoblist

    def run(self):
        """Method called in a new process.
        """
        try:
            with fileIO.workingDirectory(self.__workingDir):
                os.environ.update(self.__envVars)
                if self.__inJoblist:
                    with open(self.__outFile, "a") as fileHandle:
                        sys.stdout = fileHandle  # redirects every output in this thread to the output file
                        self.__method(*self.__args, **self.__kwargs)
                else:
                    self.__method(*self.__args, **self.__kwargs)
        except BaseException:  # catch everything, because we cannot know what the method we execute does
            exc_type, exc_value, exc_traceback = sys.exc_info()
            with open(self.__outFile, "a") as fileHandle:
                traceback.print_exception(exc_type, exc_value, exc_traceback,
                                          limit=None, file=fileHandle)
            if not self.__inJoblist:
                traceback.print_exception(exc_type, exc_value, exc_traceback,
                                          limit=None, file=sys.stdout)
        finally:
            sys.stdout = sys.__stdout__


[docs]class MethodJob(AbstractJob): """Class to execute a given method with the given arguments. :param method: method to execute :param args: arguments which will be appended when executing :param kwargs: a variable to pass keyword arguments to a methodJob :param nTecplotLicenses: number of tecplot licenses needed by the job :param workingDirectory: directory to change to before executing :param outFile: path to a file for the output :param jobName: name for the job :param envVars: environment variables to set :param nodeType: type of the node to calculate on :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :type method: method :type args: list :type kwargs: dictionary :tpye nTecplotLicenses: integer :type workingDirectory: string :type outFile: string :type jobName: string :type envVars: dictionary :type nodeType: string :type successStatements: list :type failStatements: list :type outputDir: string :type runInBackground: boolean :type ignoreExitCode: boolean :type resourcesDict: dictionary :type weight: int :type retries: int """ _jsonAttributes = ("name", "workingDir", "methodName", "overwritingEnvVars", "nProcs", "outputFile") def __init__(self, method, args=None, kwargs=None, nTecplotLicenses=0, name=None, workingDir=None, outFile=None, envVars=None, successStatements=None, outputDir=None, failStatements=None, runInBackground=None, ignoreExitCode=None, resourcesDict=None, weight=DEFAULT_WEIGHT_METHOD, retries=0): """Initializes a new instance, saves the parameters as instance variables :param method: method to execute :param args: arguments which will be appended when executing :param kwargs: a variable to pass keyword arguments to a methodJob :param nTecplotLicenses: number of tecplot licenses needed by the job :param workingDirectory: directory to change to before executing :param outFile: path to a file for the output :param jobName: name for the job :param envVars: environment variables to set :param nodeType: type of the node to calculate on :param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success :param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure :param outputDir: directory the output file will be written to :param runInBackground: If true the job will run in the background, when executed as single job :param ignoreExitCode: if True the job will ignore the exit code of the executable :param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource) :param weight: the weight (seconds to run times number of processes) of the job :param retries: number of times the job is retried when failing :type method: method :type args: list :type kwargs: dictionary :tpye nTecplotLicenses: integer :type workingDirectory: string :type outFile: string :type jobName: string :type envVars: dictionary :type nodeType: string :type successStatements: list :type failStatements: list :type outputDir: string :type runInBackground: boolean :type ignoreExitCode: boolean :type resourcesDict: dictionary :type weight: int :int retries: int """ super(MethodJob, self).__init__(name=name, nTecplotLicenses=nTecplotLicenses, outFile=outFile, outputDir=outputDir, successStatements=successStatements, failStatements=failStatements, workingDir=workingDir, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode, resourcesDict=resourcesDict, weight=weight, retries=retries) self._method = method self.__args = args or list() self.__kwargs = kwargs or dict() self.__envVars = envVars or dict() self.__process = None self._nProcs = 1 self._resources.update({PROCESSORS: self._nProcs}) @property def methodName(self): return self._method.__name__ @property def args(self): return self.__args @property def kwargs(self): return self.__kwargs @property def overwritingEnvVars(self): return self.__envVars
[docs] def runJob(self): """Executes the given method with the given parameters. """ self.prepareOutFile() self.__process = _ProcessWrapper(self._method, self.__args, self.__kwargs, self._workingDir, self.outFile, self.__envVars, self._inJoblist) self.__process.start() if not self._inJoblist: self.waitUntilFinished()
[docs] def updateStatus(self): """Updates the status of the job and returns True if the status has changed False else. :return: Returns True if the status has changed :rtype: boolean """ if not self.__process.is_alive(): self.lookForSuccess() self.__process.close() return True else: return False
[docs] def stop(self): """Terminates its own process and writes a comment to the output file. """ self.__process.terminate() self.__process.join(1) self.__process.close() with open(self.outFile, "a") as fileHandle: fileHandle.write("Job aborted!")