"""
QSubJob
=======
QSubJob supplies a class which inherits from ShellJob and Clusterinfo. The functionality is nearly
the same as in ShellJob but the job will be executed on nodes and not on the workstation or the
master.
"""
import os
import re
import subprocess
import time
from ... import jobManagementData as jobConsts
from ..shellJob import ShellJob
from ...management.clusterInfo import ClusterInfo
from ...management import qstat
from ...management.resources import NODES, PROCESSORS
def cleanQsubStandardFilesForTestcase(workingDir, tcName, removeScript):
"""Remove all files <job name>.(e|o)+?[0-9]
:param workingDir: working directory
:type workingDir: string
:param tcName: test case name
:type tcName: string
:param removeScript: flag whether file should be removed
:type removeScript: boolean
:return: number of deleted files
:rtype: integer
"""
nDelFiles = 0
jobFiles = tcName + ".[eo][0-9]+?"
for elt in [f for f in os.listdir(workingDir) if re.search(jobFiles, f)]:
try:
deletedFilePath = os.path.join(workingDir, elt)
# do not remove the file if the job was not terminated normally and it is not empty
if removeScript:
os.remove(deletedFilePath)
nDelFiles += 1
except OSError:
pass
return nDelFiles
[docs]class QSubJob(ShellJob):
"""Class to start a job with pThreads and mpi on nodes on the cluster.
:param executable: path to the executable
:param nProcs: number of processors to use
:param args: arguments which will be appended when executing
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param name: name for the job
:param outFile: path to a file for the output
:param workingDir: directory to change to before executing
:param envVars: environment variables to set
:param threadsPerProc: threads to start per processor
:param procsPerNode: process per node
:param queue: queue to put the job in
:param nodeType: type of the node to calculate on
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param outputDir: directory the output file will be written to
:param runInBackground: If true the job will run in the background, when executed as single job
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param wallTime: maximum wall time for a job on the cluster
:param weight: the weight (seconds to run times number of processes) of the job
:param retries: number of times the job is retried when failing
:type executable: str
:type nProcs: int
:type args: list
:tpye nTecplotLicenses: int
:type name: str
:type outFile: str
:type workingDir: str
:type envVars: dict
:type threadsPerProc: int
:type procsPerNode: int
:type queue: str
:type nodeType: str
:type successStatements: list
:type failStatements: list
:type outputDir: str
:type runInBackground: bool
:type ignoreExitCode: bool
:type wallTime: str
:type resourcesDict: dict
:type weight: int
:type retries: int
"""
_defaultQueue = "batch"
_runScriptExtension = '.run.sh'
_jsonAttributes = ("name", "workingDir", "qsubScript", "executable", "args", "overwritingEnvVars", "nNodes", "nProcsPerNode", "nThreadsPerProc", "outputFile", "queue", "maxWallTime")
def __init__(self, executable, nProcs=1, args=None, nTecplotLicenses=0, name=None, outFile=None,
workingDir=None, envVars=None, threadsPerProc=None, procsPerNode=None,
queue=None, nodeType="w", successStatements=None, outputDir=None, failStatements=None,
runInBackground=None, ignoreExitCode=None, resourcesDict=None, useMpirun=False, wallTime=None,
weight=jobConsts.DEFAULT_WEIGHT_SHELL, dependJobID=None, retries=0):
"""initializes a new instance calling the super constructors, calculates the needed nodes if
not given and creates a name for the qsub script.
:param executable: path to the executable
:param nProcs: number of processors to use
:param args: arguments which will be appended when executing
:param nTecplotLicenses: number of tecplot licenses needed by the job
:param name: name for the job
:param outFile: path to a file for the output
:param workingDir: directory to change to before executing
:param envVars: environment variables to set
:param threadsPerProc: threads to start per processor
:param procsPerNode: process per node
:param queue: queue to put the job in
:param nodeType: type of the node to calculate on
:param successStatements: list of regular expressions the output file will be searched for (line by line) to check for success
:param failStatements: list of regular expressions the output file will be searched for (line by line) to check for failure
:param outputDir: directory the output file will be written to
:param runInBackground: If true the job will run in the background, when executed as single job
:param ignoreExitCode: if True the job will ignore the exit code of the executable
:param resourcesDict: a dictionary containing resources needed by the job (key: resource, value: quantity of the resource)
:param wallTime: maximum wall time for a job on the cluster
:param weight: the weight (seconds to run times number of processes) of the job
:param dependJobID: ID of the job which has to finish before start
:param retries: number of times the job is retried when failing
:type executable: str
:type nProcs: int
:type args: list
:tpye nTecplotLicenses: int
:type name: str
:type outFile: str
:type workingDir: str
:type envVars: dict
:type threadsPerProc: int
:type procsPerNode: int
:type queue: str
:type nodeType: str
:type successStatements: list
:type failStatements: list
:type outputDir: str
:type runInBackground: bool
:type ignoreExitCode: bool
:type wallTime: str
:type resourcesDict: dict
:type weight: int
:type dependJobID: int
:type retries: int
"""
self.clusterInfo = ClusterInfo.from_environment(nodeType)
super(QSubJob, self).__init__(executable=executable, nProcs=nProcs, args=args, nTecplotLicenses=nTecplotLicenses, name=name,
outFile=outFile, workingDir=workingDir, envVars=envVars, threadsPerProc=threadsPerProc, successStatements=successStatements,
outputDir=outputDir, failStatements=failStatements, runInBackground=runInBackground, ignoreExitCode=ignoreExitCode,
resourcesDict=resourcesDict, useMpirun=useMpirun, weight=weight, retries=retries)
# Output is written to file in any case. Therefore the output file must not be deleted during cleanup.
self._writeToFile = True
self.dependJobID = dependJobID
self.__jobId = None
self.__queue = queue or self._defaultQueue
self.__qsubScript = f'{self.name}{self._runScriptExtension}'
self.__qsubScript = os.path.abspath(os.path.join(self._workingDir, self.__qsubScript))
self.__qstat = None
self.__qstatErrorCounter = 0
if procsPerNode and threadsPerProc:
self.__procsPerNode = procsPerNode
self._threadsPerProc = threadsPerProc
elif procsPerNode and not threadsPerProc:
assert procsPerNode <= self.clusterInfo.cpuInfo.numLogicalCPUs, \
"Unable to calculate number of threads, there are more processes than cpus on the nodes"
self.__procsPerNode = procsPerNode
self._threadsPerProc = self.clusterInfo.cpuInfo.numLogicalCPUs // procsPerNode
elif not procsPerNode and not threadsPerProc:
self.__procsPerNode = self.clusterInfo.cpuInfo.numPhysicalCores
self._threadsPerProc = self.clusterInfo.cpuInfo.numSiblingsPerCore
elif threadsPerProc == 1 and not procsPerNode:
self.__procsPerNode = self.clusterInfo.cpuInfo.numPhysicalCores
self._threadsPerProc = 1
else:
self.__procsPerNode = self.clusterInfo.cpuInfo.numLogicalCPUs // threadsPerProc
self._threadsPerProc = threadsPerProc
nNodes, carry = divmod(nProcs, self.__procsPerNode)
self.__nNodes = nNodes + 1 if carry else nNodes
self._resources.update({NODES: self.__nNodes, PROCESSORS: 0}) # set processors to zero, because the job does not need any processors on the master
self._modulesConfigured = (jobConsts.MODULES_HOME in os.environ) or (jobConsts.MODULE_PATH in os.environ)
self._loadedModules = list()
if self._modulesConfigured:
self._loadedModules = os.environ[jobConsts.LOADED_MODULES].split(":")
self.__MaxWallTime = None
self.maxWallTime = wallTime
@property
def jobId(self):
return self.__jobId
@property
def queue(self):
return self.__queue
@property
def nProcsPerNode(self):
return self.__procsPerNode
@property
def nNodes(self):
return self.__nNodes
@property
def qsubScript(self):
return self.__qsubScript
@property
def maxWallTime(self):
return self.__MaxWallTime
@maxWallTime.setter
def maxWallTime(self, value):
self.__MaxWallTime = value
if value:
timeList = value.split(':')
if len(timeList) == 1:
self.weight = int(timeList[0])
elif len(timeList) == 2:
self.weight = int(timeList[0]) * 60 + int(timeList[1])
elif len(timeList) == 3:
self.weight = int(timeList[0]) * 3600 + int(timeList[1]) * 60 + int(timeList[2])
elif len(timeList) == 4:
self.weight = int(timeList[0]) * 86400 + int(timeList[1]) * 3600 + int(timeList[2]) * 60 + int(timeList[3])
else:
raise ValueError("Unknown time format")
def _getCommandLine(self):
"""Builds up the command line which enqueues the job
"""
commandLine = ["qsub", "-V",
"-l", "nodes={}:ppn={}".format(self.__nNodes, self.__procsPerNode),
"-q", self.__queue,
"-N", self._name,
]
if self.maxWallTime:
commandLine.append("-l")
commandLine.append("walltime={}".format(self.maxWallTime))
if self.dependJobID is not None:
commandLine.append("-W")
commandLine.append("depend=afterany:{}".format(self.dependJobID))
commandLine.append(self.__qsubScript)
return commandLine
[docs] def runJob(self):
"""Starts the job using qsub and pThreads when needed.
"""
self._startTime = None # Reset start time as we only want to measure the actually elapsed time
self.prepareOutFile()
if self._threadsPerProc > 1:
self._args.insert(0, f"-tpp {self._threadsPerProc}")
envVars = os.environ.copy()
envVars.update(self._envVars)
self.__createBatchFile()
commandline = self._getCommandLine()
try:
qsubStdout = subprocess.Popen(commandline, cwd=self._workingDir, env=envVars, stdout=subprocess.PIPE, encoding="utf8").communicate()[0]
try:
self.__jobId = qsubStdout.split()[0]
self.__qstat = qstat.Qstat(self.__jobId)
except IndexError:
self._writeStatus(jobConsts.ERROR, "cannot read jobId. Trying to submit to the wrong queue?")
return
if not self._inJoblist:
if self._runInBackground:
print(f"The id of the just started job is '{self.__jobId}'.")
else:
self.waitUntilFinished(pollingInterval=jobConsts.POLLING_INTERVAL_CLUSTER, maxPollingInterval=jobConsts.MAX_POLLING_INTERVAL_CLUSTER,
scalingPollingFactor=jobConsts.SCALING_POLLING_INTERVAL_CLUSTER)
except OSError as e:
self._writeStatus(jobConsts.ERROR, f"cannot call qsub: {e}")
[docs] def stop(self):
"""Kills the current job via process qdel.
"""
commandline = f"qdel {self.__jobId}"
try:
subprocess.call(commandline.split())
except OSError:
self._writeStatus(jobConsts.ERROR, f"Unable to kill job {self.__jobId}")
def __createBatchFile(self):
"""Generates the <JOBNAME>.run.sh file, the pbs-script which will be passed to qsub
"""
lines = []
# shebang bash
lines.append("#!/bin/bash \n")
# change to the same directory on each execution host
lines.append("cd $PBS_O_WORKDIR \n")
# making the "module" command available
lines.append(". /etc/profile\n")
# write additional environment variables
for key, value in self._envVars.items():
lines.append(f"export {key}={value}\n")
# load the same modules which were loaded on the starting shell
if self._modulesConfigured:
lines.append("module purge\n")
for module in self._loadedModules:
lines.append(f"module load {module}\n")
# switch to an alternative working directory
if self._workingDir:
lines.append(f"cd {self._workingDir}\n")
lines.append(" ".join(self._generateExecutionCommand()) + f" >> {self.outFile} 2>&1\n")
# write the script to a file
try:
with open(self.__qsubScript, "w") as handle:
handle.writelines(lines)
except IOError:
raise RuntimeError(f"ERROR: Cannot create batch script {self.__qsubScript}.")
[docs] def updateStatus(self):
"""
"""
errorMax = 20
jobStatus = self.__qstat.status
if jobStatus in (qstat.Status.WAITING, qstat.Status.RUNNING):
# Reset errorCount after one successful poll
self.__qstatErrorCounter = 0
if self._startTime is None and jobStatus == qstat.Status.RUNNING:
self._startTime = time.time()
return False
elif jobStatus == qstat.Status.DONE:
self.lookForSuccess()
return True
else:
self.__qstatErrorCounter += 1
if self.__qstatErrorCounter == errorMax:
self._writeStatus(jobConsts.ERROR, "Error getting qstat status")
return False
[docs] def cleanup(self, force=False, **kwargs):
"""Remove created scripts, output and error files
:param force: flag whether cleanup is enforced
:type force: bool
:param removeScript: flag whether files should be removed
:type removeScript: bool
"""
# remove output file and thread script
super(QSubJob, self).cleanup(force, **kwargs)
# remove qsub output files
self.__cleanupScript(**kwargs)
def __cleanupScript(self, **kwargs):
"""Removes all created scripts and qSub output and error files.
:param removeScript: flag whether files should be removed
:type removeScript: bool
"""
removeScript = kwargs.get("removeScript", False)
# remove qsub script
if removeScript and self.status.code == jobConsts.DONE:
try:
os.remove(self.__qsubScript)
except OSError:
pass
# remove all files <job name>.(e|o)+?[0-9]
cleanQsubStandardFilesForTestcase(self._workingDir, self._name, removeScript)