'''
Created on Feb 28, 2013
@author: jmht
'''
import logging
import multiprocessing
import os
import time
from ample.util import ample_util
from ample.util import clusterize
from ample.util import worker
# logger = logging.getLogger(__name__)
logger = logging.getLogger()
[docs]class JobServer(object):
def __init__(self):
self.inqueue = None
self.outqueue = None
logger.info("Running jobs on a local machine")
[docs] def setJobs(self, jobs):
"""Add the list of jobs we are to run"""
if self.inqueue: raise RuntimeError,"NOT THOUGHT ABOUT MULTIPLE INVOCATIONS!"
# Queue to hold the jobs we want to run
queue = multiprocessing.Queue()
# Add jobs to the inqueue
#logger.info("Generating MRBUMP runscripts in: {0}".format( os.getcwd() ) )
for job in jobs:
if not os.path.isfile(job): raise RuntimeError,"JobServer cannot find job: {0}".format(job)
queue.put(job)
self.inqueue = queue
#self.inqueue.close()
# We can't call inqueue.close() even though we've finished as it all goes horribly wrong
# We sleep to allow enough time for the objects to be picked and put on the queue
time.sleep(2)
return
[docs] def start(self, nproc=None, early_terminate=False, check_success=None, monitor=None):
assert nproc != None
# Now start the jobs
processes = []
for i in range(nproc):
process = multiprocessing.Process(target=worker.worker, args=(self.inqueue,
early_terminate,
check_success))
process.start()
processes.append(process)
# Loop through the processes checking if any are done
timeout=1*60
# Broken on OSX
#qsize=self.outqueue.qsize()
if monitor: monitor()
success=True
while len(processes):
for i, process in enumerate(processes):
# Join process for timeout seconds and if we haven't finished by then move onto the next process
process.join(timeout)
if not process.is_alive():
logger.debug("Checking completed process {0} with exitcode {1}".format(process,process.exitcode))
# Set failed if any job failed
if process.exitcode != 0:
logger.critical("Process {0} failed with exitcode {1}".format(process,process.exitcode))
success=False
# Finished so see what happened
if process.exitcode == 0 and early_terminate:
if not self.inqueue.empty():
print "Process {0} was successful so removing remaining jobs from inqueue".format(process.name)
logger.info( "Process {0} was successful so removing remaining jobs from inqueue".format(process.name) )
# Remove all remaining processes from the inqueue. We do this rather than terminate the processes
# as terminating leaves the MRBUMP processes running. This way we hang around until all our
# running processes have finished
while not self.inqueue.empty():
job = self.inqueue.get()
logger.debug( "Removed job [{0}] from inqueue".format(job))
else:
print "Got empty queue - all jobs done"
# Remove from processes to check
del processes[i]
# Run the monitor function
if monitor: monitor()
# need to wait here as sometimes it takes a while for the results files to get written
time.sleep(3)
return success
[docs]def run_scripts(job_scripts,
monitor=None,
check_success=None,
early_terminate=None,
nproc=None,
job_time=None,
job_name=None,
submit_cluster=None,
submit_qtype=None,
submit_queue=None,
submit_pe_lsf=None,
submit_pe_sge=None,
submit_array=None,
submit_max_array=None):
if submit_cluster:
return run_scripts_cluster(job_scripts,
nproc=nproc,
monitor=monitor,
job_time=job_time,
job_name=job_name,
submit_cluster=submit_cluster,
submit_qtype=submit_qtype,
submit_queue=submit_queue,
submit_pe_lsf=submit_pe_lsf,
submit_pe_sge=submit_pe_sge,
submit_array=submit_array,
submit_max_array=submit_max_array
)
else:
return run_scripts_serial(job_scripts,
nproc=nproc,
monitor=monitor,
early_terminate=early_terminate,
check_success=check_success,
)
[docs]def run_scripts_cluster(job_scripts,
monitor=None,
job_time=None,
job_name=None,
submit_cluster=None,
submit_qtype=None,
submit_queue=None,
submit_pe_lsf=None,
submit_pe_sge=None,
submit_array=None,
submit_max_array=None,
nproc=None):
logger = logging.getLogger()
logger.info("Running jobs on a cluster")
cluster_run = clusterize.ClusterRun()
cluster_run.QTYPE = submit_qtype
if submit_array and len(job_scripts) > 1:
cluster_run.submitArrayJob(job_scripts,
job_time=job_time,
job_name=job_name,
submit_max_array=submit_max_array,
submit_qtype=submit_qtype,
submit_queue=submit_queue
)
else:
for script in job_scripts:
dirname, sname = os.path.split(script)
name = os.path.splitext(sname)[0]
logfile = os.path.join(dirname, "{0}.log".format(name))
if not job_name: job_name = name
if nproc is None: nproc = 1
with open(script) as f: lines = f.readlines()
slines = clusterize.ClusterRun().queueDirectives(nproc=nproc,
job_name=job_name,
job_time=job_time,
log_file=logfile,
submit_queue=submit_queue,
submit_qtype=submit_qtype,
submit_pe_lsf=submit_pe_lsf,
submit_pe_sge=submit_pe_sge
)
# We add the queue directives after the first line of the script
with open(script,'w') as f: f.writelines("".join([lines[0]] + slines + lines[1:]))
os.chmod(script, 0o777)
cluster_run.submitJob(script)
# Monitor the cluster queue to see when all jobs have finished
cluster_run.monitorQueue(monitor=monitor)
# Rename scripts for array jobs
if submit_array and len(job_scripts) > 1: cluster_run.cleanUpArrayJob()
return True
[docs]def run_scripts_serial(job_scripts,
nproc=None,
monitor=None,
early_terminate=None,
check_success=None,
):
success=False
if len(job_scripts) > 1:
# Don't need early terminate - check_success if it exists states what's happening
js = JobServer()
js.setJobs(job_scripts)
success = js.start(nproc=nproc,
early_terminate=bool(early_terminate),
check_success=check_success,
monitor=monitor,
)
else:
script=job_scripts[0]
name=os.path.splitext(os.path.basename(script))[0]
logfile="{0}.log".format(name)
wdir=os.path.dirname(script)
os.chdir(wdir)
rtn = ample_util.run_command([script], logfile=logfile)
if rtn == 0: success = True
return success
# Need this defined outside of the test or it can't be pickled on Windoze
def _check_success_test( job ):
jobname = os.path.splitext(os.path.basename(job))[0]
if jobname == "job_2": return True
return False