Source code for ample.util.worker
"""Worker functions for job execution"""
__author__ = "Jens Thomas"
__date__ = "28 Feb 2013"
__version__ = "1.0"
import logging
import multiprocessing
import os
import sys
from ample.util import ample_util
logger = logging.getLogger(__name__)
[docs]def worker(inqueue, early_terminate=False, check_success=None):
"""Worker process to run MrBump jobs until no more left.
This function keeps looping over the inqueue, removing jobs from the
inqueue until there are no more left. It checks if a jobs has succeeded
and if so it will terminate.
Parameters
----------
inqueue : :obj:`Queue`
A Python Queue object
early_terminate : bool
Terminate on first success or continue running
check_success : callable
A callable to check the success status of a job
Warnings
--------
This needs to import the main module that it lives in so maybe this should
live in a separate module?
"""
if early_terminate:
assert callable(check_success)
success = True
while True:
if inqueue.empty():
logger.debug("worker {0} got empty inqueue".format(multiprocessing.current_process().name))
rcode = 0 if success else 1
sys.exit(rcode)
# Got a script so run
job = inqueue.get()
# Get name from script
logger.debug("Worker {0} running job {1}".format(multiprocessing.current_process().name, job))
directory, sname = os.path.split(job)
jobname = os.path.splitext(sname)[0]
# Change directory to the script directory
os.chdir(directory)
retcode = ample_util.run_command([job], logfile=jobname + ".log", dolog=False, check=True)
# Can we use the retcode to check?
# REM - is retcode object
if retcode != 0:
logger.warning("WARNING! Worker {0} got retcode {1}".format(multiprocessing.current_process().name, retcode))
success = False
# Now check the result if early terminate
if early_terminate:
if check_success(job):
logger.debug("Worker {0} job succeeded".format(multiprocessing.current_process().name))
sys.exit(0)