#!/usr/bin/env python
# Class for submitting ample modelling and MR/Refine/Build components to a cluster
# queuing system.
#
# Ronan Keegan 25/10/2011
#
import logging
import os
import subprocess
import shlex
import shutil
import time
logger = logging.getLogger(__name__)
[docs]class ClusterRun:
def __init__(self):
self.qList = []
self.runningQueueList = []
self.QTYPE = ""
self.modeller = None
self.runDir = None
self.logDir = None
self._scriptFile = None
self.debug = True
return
[docs] def cleanUpArrayJob(self, scriptFile=None, logDir=None):
"""Rename all the log files
Args:
logDir: directory that the logfiles should end up in
"""
if not scriptFile:
scriptFile = self._scriptFile
assert os.path.isfile(scriptFile), "Cannot find scriptFile {0}".format(scriptFile)
scriptFiles = []
with open(scriptFile) as f:
for line in f:
scriptFiles.append(line.strip())
for i, line in enumerate(scriptFiles):
jobDir, script = os.path.split(line)
jobName = os.path.splitext(script)[0]
oldLog = "arrayJob_{0}.log".format(i + 1)
if logDir is None:
# Put log in script directory
newLog = os.path.join(jobDir, "{0}.log".format(jobName))
else:
newLog = os.path.join(logDir, "{0}.log".format(jobName))
if os.path.isfile(oldLog):
logger.debug("Moving {0} to {1}".format(oldLog, newLog))
# WARNING: problems with os.rename() call on BADB
# os.rename docs -->> The operation may fail on some Unix
# flavors if src and dst are on different filesystems.
shutil.move(oldLog, newLog)
else:
logger.critical("Cannot find logfile {0} to copy to {1}".format(oldLog, newLog))
return
[docs] def getRunningJobList(self, user=""):
""" Check a job status int the cluster queue
For LSF output is of form:
JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME
35340 jxt15-d RUN q1h32 ida7c42 ida2a40 *ep 5;done Mar 25 13:23
ida2a40
ida2a40
"""
if self.QTYPE == "SGE":
if user == "":
command_line = 'qstat'
else:
command_line = 'qstat -u ' + user
elif self.QTYPE == "LSF":
if user == "":
command_line = 'bjobs'
else:
command_line = 'bjobs -u ' + user
log_lines = []
self.runningQueueList = []
process_args = shlex.split(command_line)
p = subprocess.Popen(process_args, stdout=subprocess.PIPE)
child_stdout = p.stdout
out = child_stdout.readline()
while out:
# sys.stdout.write(out)
log_lines.append(out.strip())
out = child_stdout.readline()
child_stdout.close()
if log_lines != []:
log_lines.pop(0)
# SGE has extra header
if self.QTYPE == "SGE":
log_lines.pop(0)
for i in log_lines:
self.runningQueueList.append(i.split()[0])
return
[docs] def monitorQueue(self, user="", monitor=None):
""" Monitor the Cluster queue to see when all jobs are completed """
if not len(self.qList):
raise RuntimeError("No jobs found in self.qList!")
logger.info("Jobs submitted to cluster queue, awaiting their completion...")
# set a holder for the qlist
runningList = self.qList
newRunningList = []
while runningList != []:
time.sleep(60)
self.getRunningJobList(user)
for job in runningList:
if str(job) in self.runningQueueList:
newRunningList.append(job)
if len(runningList) > len(newRunningList):
logger.info(
"Queue Monitor: %d out of %d jobs remaining in cluster queue...",
len(newRunningList),
len(self.qList),
)
if len(newRunningList) == 0:
logger.info("Queue Monitor: All jobs complete!")
runningList = newRunningList
newRunningList = []
if monitor:
monitor()
return
[docs] def queueDirectives(
self,
nproc=None,
log_file=None,
job_name=None,
job_time=None,
submit_max_array=None,
submit_num_array_jobs=None,
submit_pe_sge='mpi',
submit_pe_lsf='#BSUB -R "span[ptile={0}]"',
submit_qtype=None,
submit_queue=None,
):
"""
Create a string suitable for writing out as the header of the submission script
for submitting to a particular queueing system.
Args:
job_scripts -- the list of scripts to run as the array
job_name -- the name of the job in the queue (required for LSF array jobs)
job_dir -- the directory the job will run in
job_time -- maximum job runtime in minutes (CHECK)
submit_max_array -- maximum number of array jobs to run concurrently
submit_queue -- the name of the queue to submit the job to
submit_qtype -- the type of the queueing system (e.g. SGE)
Returns:
queue directives as a list of EOL-terminated strings
"""
sh = []
if submit_qtype == "SGE":
sh += ['#$ -j y\n', '#$ -cwd\n', '#$ -w e\n', '#$ -V\n', '#$ -S /bin/bash\n']
if job_time:
sh += ['#$ -l h_rt={0}\n'.format(job_time)]
if submit_queue:
sh += ['#$ -q {0}\n'.format(submit_queue)]
if job_name:
sh += ['#$ -N {0}\n'.format(job_name)]
if submit_num_array_jobs:
sh += ['#$ -o arrayJob_$TASK_ID.log\n']
sh += ['#$ -t 1-{0}\n'.format(submit_num_array_jobs)]
if submit_max_array:
sh += ['#$ -tc {0}\n'.format(submit_max_array)]
else:
if log_file:
sh += ['#$ -o {0}\n'.format(log_file)]
if nproc and nproc > 1:
sh += ['#$ -pe {0} {1}\n'.format(submit_pe_sge, nproc)]
# Later versions of SGE don't properly export the PATH so need to do it ourselves
sh += ['export PATH=${PATH}:${SGE_O_PATH}\n']
sh += ['\n']
elif submit_qtype == "LSF":
if nproc and submit_pe_lsf:
sh += [submit_pe_lsf.format(nproc) + os.linesep]
if job_time:
sh += ['#BSUB -W {0}\n'.format(job_time / 60)]
else:
sh += ['#BSUB -W 4:00\n']
if nproc and nproc > 1:
sh += ['#BSUB -n {0}\n'.format(nproc)]
if submit_queue:
sh += ['#BSUB -q {0}\n'.format(submit_queue)]
if log_file:
sh += ['#BSUB -o {0}\n'.format(log_file)]
if submit_num_array_jobs:
assert job_name, "LSF array job requires a job name"
sh += ['#BSUB -o arrayJob_%I.log\n']
if submit_max_array:
sh += ['#BSUB -J {0}[1-{1}]%{2}\n'.format(job_name, submit_num_array_jobs, submit_max_array)]
else:
sh += ['#BSUB -J {0}[1-{1}]\n'.format(job_name, submit_num_array_jobs)]
elif job_name:
sh += ['#BSUB -J {0}\n'.format(job_name)]
sh += ['\n']
else:
raise RuntimeError("Unrecognised QTYPE: {0}".format(submit_queue))
sh += ['\n']
return sh
[docs] def submitJob(self, subScript):
"""
Submit the job to the queue and return the job number.
Args:
subScript -- the path to the submission script
Returns:
job number as a string
We cd to the job directory, submit and then cd back to where we came from
"""
command_line = None
stdin = None
if self.QTYPE == "SGE":
command_line = 'qsub -V %s' % subScript
elif self.QTYPE == "LSF":
command_line = 'bsub'
stdin = open(subScript, "r")
else:
msg = "Unrecognised QTYPE: {0}".format(self.QTYPE)
raise RuntimeError(msg)
logger.debug("Submitting job with command: {0}".format(command_line))
process_args = shlex.split(command_line)
try:
p = subprocess.Popen(process_args, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise RuntimeError("Error submitting job to queue with commmand: {0}\n{1}".format(command_line, e))
child_stdout = p.stdout
child_stderr = p.stderr
# Check there were no errors
stderr_str = child_stderr.readline()
if self.QTYPE == "SGE" and "Unable to run job" in stderr_str:
raise RuntimeError("Error submitting job to cluster queueing system: {0}".format(stderr_str))
# Watch the output for successful termination
out = child_stdout.readline()
qNumber = 0
while out:
qNumber = None
if self.QTYPE == "SGE":
if "Your job-array" in out:
# Array jobs have different form
# Your job-array 19094.1-10:1 ("array.script") has been submitted
qNumber = int(out.split()[2].split(".")[0])
self.qList.append(qNumber)
elif "Your job" in out:
qNumber = int(out.split()[2])
self.qList.append(qNumber)
elif self.QTYPE == "LSF":
# Job <35339> is submitted to queue <q1h32>.
if "is submitted to queue" in out:
qStr = out.split()[1]
qNumber = int(qStr.strip("<>"))
self.qList.append(qNumber)
if qNumber:
logger.debug("Submission script {0} submitted to queue as job {1}".format(subScript, qNumber))
out = child_stdout.readline()
child_stdout.close()
return str(qNumber)
[docs] def submitArrayJob(
self, job_scripts, job_name=None, job_time=None, submit_max_array=None, submit_queue=None, submit_qtype=None
):
"""Submit a list of jobs as an array job
Args:
job_scripts -- the list of scripts to run as the array
job_name -- the name of the job in the queue (required for LSF array jobs)
job_dir -- the directory the job will run in
job_time -- maximum job runtime in minutes (CHECK)
submit_max_array -- maximum number of array jobs to run concurrently
submit_queue -- the name of the queue to submit the job to
submit_qtype -- the type of the queueing system (e.g. SGE)
"""
job_dir = os.getcwd()
# Create the list of scripts
self._scriptFile = os.path.abspath(os.path.join(job_dir, "array.jobs"))
nJobs = len(job_scripts)
with open(self._scriptFile, 'w') as f:
for s in job_scripts:
# Check the scripts are of the correct format - abspath and .sh extension
if not s.startswith("/") or not s.endswith(".sh"):
raise RuntimeError(
"Scripts for array jobs must be absolute paths with a .sh extension: {0}".format(s)
)
f.write(s + "\n")
# Generate the qsub array script
arrayScript = os.path.abspath(os.path.join(job_dir, "array.script"))
if submit_qtype == "SGE":
task_env = 'SGE_TASK_ID'
elif submit_qtype == "LSF":
task_env = 'LSB_JOBINDEX'
else:
raise RuntimeError("Unsupported submission type: {0}".format(submit_qtype))
# Write head of script
s = "#!/bin/sh\n"
# Queue directives
s += "".join(
self.queueDirectives(
nproc=None,
log_file=None,
job_name=job_name,
job_time=job_time,
submit_max_array=submit_max_array,
submit_num_array_jobs=nJobs,
submit_queue=submit_queue,
submit_qtype=submit_qtype,
)
)
# body
s += """scriptlist={0}
# Extract info on what we need to run
script=`sed -n "${{{1}}}p" $scriptlist`
jobdir=`dirname $script`
jobname=`basename $script .sh`
# cd to jobdir and runit
cd $jobdir
# Run the script
$script
""".format(
self._scriptFile, task_env
)
with open(arrayScript, 'w') as f:
f.write(s)
self.submitJob(arrayScript)
return