"""MTZ utility functions"""
__author__ = "Jens Thomas"
__date__ = "02 Dec 2014"
import logging
import os
import shutil
import sys
import uuid
from iotbx import reflection_file_reader
from ample.util import ample_util, cif_parser, exit_util
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
COLTYPE_F = 'F'
COLTYPE_SIGF = 'Q'
[docs]def del_column(file_name, column, overwrite=True):
"""Delete a column from an mtz file and return a path to the file"""
mtzDel = ample_util.filename_append(file_name, "d{0}".format(column))
cmd = ["mtzutils", "hklin1", file_name, "hklout", mtzDel]
stdin = "EXCLUDE 1 {0}".format(column)
logfile = os.path.join(os.getcwd(), "mtzutils_{}.log".format(str(uuid.uuid1())))
retcode = ample_util.run_command(cmd, stdin=stdin, logfile=logfile)
if retcode != 0:
raise RuntimeError("Error running mtzutils. Check the logfile: {0}".format(logfile))
else:
os.unlink(logfile)
if overwrite:
shutil.move(mtzDel, file_name)
return file_name
else:
return mtzDel
[docs]def add_rfree(file_name, directory=None, overwrite=True):
"""Run uniqueify on mtz file to generate RFREE data column"""
mtzUnique = ample_util.filename_append(file_name, "uniqueify", directory=directory)
cmd = ['uniqueify', file_name, mtzUnique]
logfile = os.path.join(os.getcwd(), "uniqueify_{}.log".format(str(uuid.uuid1())))
retcode = ample_util.run_command(cmd, logfile=logfile)
if retcode != 0:
raise RuntimeError("Error running command: {0}. Check the logfile: {1}".format(" ".join(cmd), logfile))
else:
os.unlink(logfile)
if overwrite:
shutil.move(mtzUnique, file_name)
return file_name
else:
return mtzUnique
[docs]def get_labels(file_name):
"""Return the F, SIGF and FREE column labels"""
reflection_file = reflection_file_reader.any_reflection_file(file_name=file_name)
if not reflection_file.file_type() == "ccp4_mtz":
raise RuntimeError("File is not of type ccp4_mtz: {0}".format(file_name))
content = reflection_file.file_content()
ctypes = content.column_types()
clabels = content.column_labels()
if not COLTYPE_F in ctypes:
raise RuntimeError("Cannot find any structure amplitudes in: {0}".format(file_name))
F = clabels[ctypes.index(COLTYPE_F)]
# SIGF derived from F
SIGF = 'SIG' + F
if SIGF not in clabels:
raise RuntimeError("Cannot find label {0} in file: {1}".format(SIGF, file_name))
i = clabels.index(SIGF)
if ctypes[i] != COLTYPE_SIGF:
raise RuntimeError("SIGF label {0} is not of type: {1}".format(SIGF, COLTYPE_SIGF))
FREE = _get_rfree(content)
return F, SIGF, FREE
[docs]def get_rfree(file_name):
"""Return the Rfree label"""
reflection_file = reflection_file_reader.any_reflection_file(file_name=file_name)
if not reflection_file.file_type() == "ccp4_mtz":
raise RuntimeError("File is not of type ccp4_mtz: {0}".format(file_name))
return _get_rfree(reflection_file.file_content())
def _get_rfree(content):
rfree_label = None
for label in content.column_labels():
if 'free' in label.lower():
column = content.get_column(label=label)
selection_valid = column.selection_valid()
flags = column.extract_values()
sel_0 = flags == 0
# extract number of work/test reflections
n0 = (sel_0 & selection_valid).count(True)
n1 = (~sel_0 & selection_valid).count(True)
if n0 > 0 and n1 > 0:
if rfree_label:
logger.warning("FOUND >1 RFREE label in file!")
rfree_label = label
return rfree_label
[docs]def max_min_resolution(file_name):
reflection_file = reflection_file_reader.any_reflection_file(file_name=file_name)
if not reflection_file.file_type() == "ccp4_mtz":
logger.warning("File is not of type ccp4_mtz: {0}".format(file_name))
sys.exit(1)
return reflection_file.file_content().max_min_resolution()
[docs]def to_hkl(mtz_file, hkl_file=None, directory=None, F=None, SIGF=None, FREE=None):
if directory is None:
directory = os.getcwd()
if hkl_file is None:
name = os.path.splitext(os.path.basename(mtz_file))[0]
hkl_file = os.path.join(directory, name + ".hkl")
if F is None or SIGF is None or FREE is None:
F, SIGF, FREE = get_labels(mtz_file)
cmd = ['mtz2various', 'HKLIN', mtz_file, 'HKLOUT', hkl_file]
logfile = "mtz2various_{}.log".format(str(uuid.uuid1()))
stdin = """LABIN FP={0} SIGFP={1} FREE={2}
OUTPUT SHELX
FSQUARED
END""".format(
F, SIGF, FREE
)
ret = ample_util.run_command(cmd=cmd, logfile=logfile, directory=None, dolog=False, stdin=stdin)
if ret != 0:
raise RuntimeError("Error converting {0} to HKL format - see log: {1}".format(mtz_file, logfile))
os.unlink(logfile)
return hkl_file
[docs]def processReflectionFile(amoptd):
"""Make sure we have a valid mtz file. If necessary convert a given cif file.
Set the mtz variable in the given amoptd to the reflection file to use
Return True if it all worked or raise an exception if it failed
"""
# We've been given a sf_cif so convert to mtz
if amoptd['sf_cif']:
if not os.path.isfile(amoptd['sf_cif']):
msg = "Cannot find sf_cif file: {0}".format(amoptd['sf_cif'])
exit_util.exit_error(msg)
if not os.path.splitext(amoptd['sf_cif'])[1].lower() == ".cif":
msg = "Cif file extension is not .cif Please rename the file to give it a .cif extension."
exit_util.exit_error(msg)
cp = cif_parser.CifParser()
mtz = cp.sfcif2mtz(amoptd['sf_cif'])
# See if reflections have been set aside for Rfree or if we need to calculate
if cp.hasRfree:
logger.info("sfcif2mtz: no valid RFREE data so removing FREE column added by mtz2cif")
amoptd['mtz'] = del_column(mtz, 'FREE')
else:
amoptd['mtz'] = mtz
# Now have an mtz so check it's valid
if not amoptd['mtz'] or not os.path.isfile(amoptd['mtz']):
logger.critical("Cannot find MTZ file: %s", amoptd['mtz'])
sys.exit(1)
# Get column label info
reflection_file = reflection_file_reader.any_reflection_file(file_name=amoptd['mtz'])
if not reflection_file.file_type() == "ccp4_mtz":
logger.critical("File is not of type ccp4_mtz: %s", amoptd['mtz'])
sys.exit(1)
# Read the file
content = reflection_file.file_content()
# Check any user-given flags
for flag in ['F', 'SIGF', 'FREE']:
if amoptd[flag] and amoptd[flag] not in content.column_labels():
logger.critical("Cannot find flag %s label %s in mtz file %s", flag, amoptd[flag], amoptd['mtz'])
sys.exit(1)
# If any of the flags aren't given we set defaults based on what's in the file
if not amoptd['F']:
if 'F' not in content.column_types():
logger.critical("Cannot find column type F for flag F in mtz file: %s", amoptd['mtz'])
sys.exit(1)
amoptd['F'] = content.column_labels()[content.column_types().index('F')]
if not amoptd['SIGF']:
l = 'SIG' + amoptd['F']
if not l in content.column_labels():
logger.critical("Cannot find column type %s for flag SIGF in mtz file: %s", l, amoptd['mtz'])
sys.exit(1)
amoptd['SIGF'] = l
rfree = _get_rfree(content)
if amoptd['FREE']:
# Check is valid
if not rfree or not rfree == amoptd['FREE']:
logger.critical("Given RFREE label %s is not valid for mtz file: %s", amoptd['FREE'], amoptd['mtz'])
sys.exit(1)
else:
# See if we can find a valid label in the file
if not rfree:
# Need to generate RFREE
logger.warning("Cannot find a valid FREE flag - running uniquefy to generate column with RFREE data.")
amoptd['mtz'] = add_rfree(amoptd['mtz'], directory=amoptd['work_dir'], overwrite=False)
# Check file and get new FREE flag
rfree = get_rfree(amoptd['mtz'])
if not rfree:
logger.critical("Cannot find valid rfree flag in mtz file %s after running uniquiefy", amoptd['mtz'])
sys.exit(1)
amoptd['FREE'] = rfree
# Output information to user and save to amoptd
logger.info("Using MTZ file: %s", amoptd['mtz'])
maxr, minr = content.max_min_resolution()
amoptd['mtz_min_resolution'] = minr
amoptd['mtz_max_resolution'] = maxr
msg = "Resolution limits of MTZ file are: {0: > 6.3F} and {1: > 6.3F}".format(minr, maxr)
logger.info(msg)
return True