"""Various miscellaneous functions"""
__author__ = "Jens Thomas, and Felix Simkovic"
__date__ = "01 Jan 2016"
__version__ = "1.0"
from contextlib import contextmanager
import pickle
import logging
import os
import subprocess
import sys
import tarfile
import tempfile
import warnings
import zipfile
from ample.util import ccp4, exit_util
from ample.constants import SHARE_DIR, AMPLEDIR, I2DIR
CCP4 = ccp4.CCP4()
SCRIPT_EXT = '.bat' if sys.platform.startswith('win') else '.sh'
EXE_EXT = '.exe' if sys.platform.startswith('win') else ''
SCRIPT_HEADER = '' if sys.platform.startswith('win') else '#!/bin/bash'
[docs]class FileNotFoundError(Exception):
pass
# ample_util is used before anything else so there is no logger available
# and we need to a Null handler
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
three2one = {
'ALA': 'A',
'ARG': 'R',
'ASN': 'N',
'ASP': 'D',
'CYS': 'C',
'GLU': 'E',
'GLN': 'Q',
'GLY': 'G',
'HIS': 'H',
'ILE': 'I',
'LEU': 'L',
'LYS': 'K',
'MET': 'M',
'PHE': 'F',
'PRO': 'P',
'SER': 'S',
'THR': 'T',
'TRP': 'W',
'TYR': 'Y',
'VAL': 'V',
'UNK': 'X',
}
# http://stackoverflow.com/questions/3318625/efficient-bidirectional-hash-table-in-python
# aaDict.update( dict((v, k) for (k, v) in aaDict.items()) )
one2three = dict((v, k) for (k, v) in three2one.items())
[docs]def amoptd_fix_path(optd, newroot):
"""Update all the paths in an AMPLE results dictionary to be rooted at newroot
Parameters
----------
optd: dict
AMPLE results dictionary
newroot: str
Path to the AMPLE root directory (topdir containing MRBUMP dir etc.)
"""
# oldroot = os.sep.join(optd['work_dir'].split(os.sep)[:-1])
oldroot = optd['work_dir']
for k in ['benchmark_dir', 'native_pdb', 'native_pdb_std', 'fasta']:
if k in optd and isinstance(optd[k], str):
optd[k] = optd[k].replace(oldroot, newroot)
MRBUMP_FILE_KEYS = [
'PHASER_logfile',
'PHASER_pdbout',
'PHASER_mtzout',
'REFMAC_logfile',
'REFMAC_pdbout',
'REFMAC_mtzout',
'BUCC_logfile',
'BUCC_pdbout',
'BUCC_mtzout',
'ARP_logfile',
'ARP_pdbout',
'ARP_mtzout',
'SHELXE_logfile',
'SHELXE_pdbout',
'SHELXE_mtzout',
'SXRBUCC_logfile',
'SXRBUCC_pdbout',
'SXRBUCC_mtzout',
'SXRARP_logfile',
'SXRARP_pdbout',
'SXRARP_mtzout',
]
if 'mrbump_results' in optd:
for r in optd['mrbump_results']:
for k in MRBUMP_FILE_KEYS:
if k in r and isinstance(r[k], str):
warnings.warn("FIX MRBUMP BUG buccaneer refine.pdb vs refined.pdb")
new = r[k].replace(oldroot, newroot)
# logger.info('Changing amopt entry %s from: %s to: %s', k, old, new)
r[k] = new
return optd
[docs]@contextmanager
def disable_logging(logger, max_loglevel=logging.CRITICAL):
"""A context manager to disable logging within a block.
Parameters
----------
logger : logging.Logger
logger instance
highet_level: int
loglevel that will be set for the duration of the context
"""
previous_level = None
if logger.getEffectiveLevel() < max_loglevel:
previous_level = logger.level
logger.setLevel(max_loglevel)
try:
yield
finally:
if previous_level is not None:
# changed loglevel so reset it
logger.setLevel(previous_level)
[docs]def find_exe(executable, dirs=None):
"""Find the executable exename.
Parameters
----------
executable : str
The name of the program or the path to an existing executable
dirs : list, tuple, optional
Additional directories to search for the location
"""
logger.debug('Looking for executable: %s', executable)
exe_file = None
found = False
if is_exe(executable):
exe_file = os.path.abspath(executable)
found = True
else:
# If the user has given a path we just take the name part
_, fname = os.path.split(executable)
if fname:
executable = fname
# By default we search in the system PATH and add any additional user given paths here
paths = os.environ["PATH"].split(os.pathsep)
if dirs:
paths += dirs
logger.debug('Checking paths: %s', paths)
for path in paths:
exe_file = os.path.abspath(os.path.join(path, executable))
if is_exe(exe_file):
logger.debug('Found executable %s in directory %s', executable, path)
found = True
break
if not found:
raise FileNotFoundError("Cannot find executable: {0}".format(executable))
logger.debug('find_exe found executable: %s', exe_file)
return exe_file
[docs]def filename_append(filename=None, astr=None, directory=None, separator="_"):
"""Append astr to filename, before the suffix, and return the new filename."""
dirname, fname = os.path.split(filename)
name, suffix = os.path.splitext(fname)
name = name + separator + astr + suffix
if directory is None:
directory = dirname
return os.path.join(directory, name)
[docs]def ideal_helices(optd):
"""Get some ideal helices
Parameters
----------
nresidues : int
Number of residues to be used
Returns
-------
pdbs : list
ensemble_options : dict
ensembles_data : list
"""
nresidues = optd['fasta_length']
include_dir = os.path.join(SHARE_DIR, 'include')
if not optd['helical_ensembles']:
names = ['polyala_5', 'polyala_10', 'polyala_15', 'polyala_20', 'polyala_25', 'polyala_30', 'polyala_35',
'polyala_40']
elif optd['helical_ensembles_set'] == 'minimal':
names = ['ensemble_20_bfactor2_homogeneous', 'ensemble_20_bfactor3_homogeneous',
'ensemble_15_bfactor2_homogeneous', 'ensemble_15_bfactor3_homogeneous',
'ensemble_25_bfactor2_homogeneous', 'ensemble_25_bfactor3_homogeneous',
'ensemble_30_bfactor2_homogeneous', 'ensemble_30_bfactor3_homogeneous',
'ensemble_35_bfactor2_homogeneous', 'ensemble_35_bfactor3_homogeneous',
'ensemble_40_bfactor2_homogeneous', 'ensemble_40_bfactor3_homogeneous']
else:
names = ['ensemble_20_bfactor2_homogeneous', 'ensemble_20_bfactor2_heterogeneous',
'ensemble_20_bfactor1_heterogeneous', 'ensemble_20_bfactor1_homogeneous',
'ensemble_20_bfactor4_heterogeneous', 'ensemble_20_bfactor3_homogeneous',
'ensemble_20_bfactor3_heterogeneous', 'ensemble_20_bfactor4_homogeneous',
'ensemble_15_bfactor2_heterogeneous', 'ensemble_15_bfactor2_homogeneous',
'ensemble_15_bfactor1_heterogeneous', 'ensemble_15_bfactor1_homogeneous',
'ensemble_15_bfactor4_heterogeneous', 'ensemble_15_bfactor3_heterogeneous',
'ensemble_15_bfactor4_homogeneous', 'ensemble_15_bfactor3_homogeneous',
'ensemble_25_bfactor2_homogeneous', 'ensemble_25_bfactor2_heterogeneous',
'ensemble_25_bfactor1_heterogeneous', 'ensemble_25_bfactor1_homogeneous',
'ensemble_25_bfactor4_homogeneous', 'ensemble_25_bfactor3_homogeneous',
'ensemble_25_bfactor4_heterogeneous', 'ensemble_25_bfactor3_heterogeneous',
'ensemble_30_bfactor2_homogeneous', 'ensemble_30_bfactor2_heterogeneous',
'ensemble_30_bfactor1_heterogeneous', 'ensemble_30_bfactor1_homogeneous',
'ensemble_30_bfactor4_homogeneous', 'ensemble_30_bfactor4_heterogeneous',
'ensemble_30_bfactor3_heterogeneous', 'ensemble_30_bfactor3_homogeneous',
'ensemble_35_bfactor2_homogeneous', 'ensemble_35_bfactor2_heterogeneous',
'ensemble_35_bfactor1_heterogeneous', 'ensemble_35_bfactor1_homogeneous',
'ensemble_35_bfactor4_heterogeneous', 'ensemble_35_bfactor4_homogeneous',
'ensemble_35_bfactor3_homogeneous', 'ensemble_35_bfactor3_heterogeneous',
'ensemble_40_bfactor2_homogeneous', 'ensemble_40_bfactor2_heterogeneous',
'ensemble_40_bfactor1_heterogeneous', 'ensemble_40_bfactor1_homogeneous',
'ensemble_40_bfactor4_heterogeneous', 'ensemble_40_bfactor3_heterogeneous',
'ensemble_40_bfactor4_homogeneous', 'ensemble_40_bfactor3_homogeneous',
'ensemble_10_bfactor2_heterogeneous', 'ensemble_10_bfactor2_homogeneous',
'ensemble_10_bfactor1_heterogeneous', 'ensemble_10_bfactor1_homogeneous',
'ensemble_10_bfactor3_homogeneous', 'ensemble_10_bfactor4_homogeneous',
'ensemble_10_bfactor3_heterogeneous', 'ensemble_10_bfactor4_heterogeneous',
'ensemble_5_bfactor2_heterogeneous', 'ensemble_5_bfactor2_homogeneous',
'ensemble_5_bfactor1_heterogeneous', 'ensemble_5_bfactor1_homogeneous',
'ensemble_5_bfactor4_homogeneous', 'ensemble_5_bfactor3_heterogeneous',
'ensemble_5_bfactor3_homogeneous', 'ensemble_5_bfactor4_heterogeneous']
ensemble_options = {}
ensembles_data = []
pdbs = []
for name in names:
nres = int(name.split('_')[1])
ncopies = nresidues / nres
if ncopies < 1:
ncopies = 1
ensemble_options[name] = {'ncopies': ncopies}
pdb = os.path.join(include_dir, "{0}.pdb".format(name))
# Needed for pyrvapi results
ensembles_data.append({'name': name, 'ensemble_pdb': pdb, 'num_residues': nres})
pdbs.append(pdb)
optd['ensembles'] = pdbs
optd['ensemble_options'] = ensemble_options
optd['ensembles_data'] = ensembles_data
return
[docs]def is_exe(fpath):
"""Check if an executable exists
Parameters
----------
fpath : str
The path to the executable
Returns
-------
bool
"""
return fpath and os.path.exists(fpath) and os.access(fpath, os.X_OK)
[docs]def is_file(fpath):
"""Check if a file exists
Parameters
----------
fpath : str
The path to the file
Returns
------
bool
"""
return fpath and os.path.isfile(fpath) and os.access(fpath, os.R_OK) and os.stat(fpath).st_size > 0
[docs]def make_workdir(run_dir, ccp4i2=False, MAX_WORKDIRS=100):
"""Make a work directory rooted at run_dir and return its path
Parameters
----------
run_dir : str
The path to a run directory where the job was started
ccp4i2 : bool, optional
Indicate if we are running under CCP4I2
Returns
-------
work_dir : str
The path to the working directory
"""
if ccp4i2:
work_dir = os.path.join(run_dir, I2DIR)
else:
run_inc = 0
while True:
work_dir = os.path.join(run_dir, AMPLEDIR + str(run_inc))
if not os.path.exists(work_dir):
break
run_inc += 1
if run_inc > MAX_WORKDIRS:
raise RuntimeError("Too many work directories! {0}".format(work_dir))
if os.path.exists(work_dir):
raise RuntimeError(
"There is an existing AMPLE work directory: {0}\n" "Please delete/move it aside.".format(work_dir)
)
os.mkdir(work_dir)
return work_dir
[docs]def run_command(cmd, logfile=None, directory=None, dolog=True, stdin=None, check=False, **kwargs):
"""Execute a command and return the exit code.
Parameters
----------
cmd : list
Command to run as a list
stdin : str or filehandle, optional
Stdin for the command
logfile : str, optional
The path to the logfile
directory : str, optional
The directory to run the job in (cwd assumed)
dolog : bool, optional
Whether to output info to the system log [default: False]
Returns
-------
returncode : int
Subprocess exit code
Notes
-----
We take care of outputting stuff to the logs and opening/closing logfiles
"""
assert type(cmd) is list, "run_command needs a list!"
if check and not is_exe(cmd[0]):
raise RuntimeError("run_command cannot find executable: {0}".format(cmd[0]))
if not directory:
directory = os.getcwd()
if dolog:
logger.debug("In directory %s", directory)
logger.debug("Running command: %s", " ".join(cmd))
if kwargs:
logger.debug("kwargs are: %s", str(kwargs))
file_handle = False
if logfile:
try:
logfile = os.path.abspath(logfile)
except TypeError:
file_handle = True
if file_handle:
logf = logfile
logfile = os.path.abspath(logf.name)
else:
logf = open(logfile, "w")
logfile = os.path.abspath(logfile)
if dolog:
logger.debug("Logfile is: %s", logfile)
else:
logf = tempfile.NamedTemporaryFile(dir=directory, delete=False, suffix='')
if stdin is not None:
stdinstr = stdin
stdin = subprocess.PIPE
# Windows needs some special treatment
if os.name == "nt":
kwargs.update({'bufsize': 0, 'shell': "False"})
p = subprocess.Popen(cmd, stdin=stdin, stdout=logf, stderr=subprocess.STDOUT, cwd=directory, **kwargs)
if stdin is not None:
p.stdin.write(stdinstr.encode())
p.stdin.close()
if dolog:
logger.debug("stdin for cmd was: %s", stdinstr)
p.wait()
if not file_handle:
logf.close()
return p.returncode
[docs]def read_amoptd(amoptd_fname):
"""Read a PICKLE-formatted AMPLE options file
Parameters
----------
amoptd_fname : str
The path to the PICKLE-formatted AMPLE options file
Returns
-------
amoptd : dict
AMPLE options from saved state
"""
if not is_file(amoptd_fname):
raise RuntimeError("Cannot access AMPLE options file: {0}\n".format(amoptd_fname))
with open(amoptd_fname, 'r') as f:
amoptd = pickle.load(f)
logger.info("Loaded state from file: %s\n", amoptd['results_path'])
return amoptd
[docs]def saveAmoptd(*args):
"""Save AMPLE options to a PICKLE-formatted file
See Also
--------
save_amoptd
Warnings
--------
This function was deprecated and will be removed in future releases. Please use ``save_amoptd()`` instead.
"""
msg = "This function was deprecated and will be removed in future release"
warnings.warn(msg, DeprecationWarning, stacklevel=2)
save_amoptd(*args)
return
[docs]def save_amoptd(amoptd):
"""Save AMPLE options to a PICKLE-formatted file
Parameters
----------
amoptd : dict
AMPLE options from saved state
"""
# Save results
with open(amoptd['results_path'], 'wb') as f:
pickle.dump(amoptd, f)
logger.info("Saved state as file: %s\n", amoptd['results_path'])
[docs]def tmpFileName():
"""Return a filename for a temporary file
See Also
--------
tmp_file_name
Warnings
--------
This function was deprecated and will be removed in future releases. Please use ``tmp_file_name()`` instead.
"""
msg = "This function was deprecated and will be removed in future release"
warnings.warn(msg, DeprecationWarning, stacklevel=2)
return tmp_file_name()
[docs]def tmp_file_name(delete=True, directory=None, suffix=""):
"""Return a filename for a temporary file
Parameters
----------
delete : bool, optional
Flag whether the temporary file should be deleted [default: True]
directory : str, optional
Path to a directory to write the files to.
suffix : str, optional
A suffix to the temporary filename
"""
directory = os.getcwd() if not directory else directory
t = tempfile.NamedTemporaryFile(dir=directory, delete=delete, suffix=suffix)
tmp1 = t.name
t.close()
return tmp1