#!/usr/bin/env ccp4-python
import datetime
import logging
import os
import phaser
import iotbx.pdb
import iotbx.mtz
from ample.util import ample_util, mtz_util, pdb_edit
logger = logging.getLogger(__name__)
def _basis_str(origin_shift):
"""Return the string to get the sgtbx basis_op for the given origin shift"""
bstr = ''
for i, axis in enumerate(['x', 'y', 'z']):
if origin_shift[i] == 0.0:
s = axis
else:
s = '{0}{1:+F}'.format(axis, origin_shift[i])
if i == 0:
bstr = s
else:
bstr = bstr + ',' + s
return bstr
def _check_mtz(mtz_file, labels=[]):
"""Return an iotbx.mtz object for the mtz_file making sure the file is valid and contains the given labels"""
mtz = iotbx.mtz.object(file_name=mtz_file)
if len(mtz.crystals()) > 2:
raise RuntimeError("Cannot deal with multiple crystal in mtz")
# # Assume the first crystal is always the base so we use the second
if len(mtz.crystals()[1].datasets()) > 1:
raise RuntimeError("Cannot deal with > 1 dataset in mtz")
for label in labels:
if not mtz.has_column(label):
raise RuntimeError("Cannot find label: {0} in mtz file: {1}".format(label, mtz_file))
return mtz
def _get_miller_array_from_label(mtz_object, column_label):
"""Assuming only a single crystal/dataset, return the miller array for the given column label"""
# Get the miller arrays for the columns we're interested in as dicts
miller_dict = mtz_object.as_miller_arrays_dict()
# Keys are 3-tuples where last item is array label
try:
col_key = [k for k in miller_dict.keys() if k[2] == column_label][0]
except IndexError:
raise RuntimeError("Cannot find column label: {0}".format(column_label))
assert col_key
return miller_dict[col_key]
[docs]def calc_phase_error_pdb(
native_pdb,
native_mtz,
mr_mtz,
f_label,
sigf_label,
native_phase_labels=['FC', 'PHIC'],
mr_phase_labels=['FC', 'PHIC'],
cleanup=True,
origin=None,
):
"""Phase error between native_pdb+native_mtz and mr_mtz
"""
assert f_label and sigf_label, "Need f_label and sigf_label to be given!"
native_mtz_phased = place_native_pdb(native_pdb, native_mtz, f_label, sigf_label)
return calc_phase_error_mtz(
native_mtz_phased,
mr_mtz,
f_label,
sigf_label,
native_phase_labels=native_phase_labels,
mr_phase_labels=mr_phase_labels,
cleanup=cleanup,
origin=origin,
)
[docs]def calc_phase_error_mtz(
native_mtz_phased,
mr_mtz,
f_label=None,
sigf_label=None,
native_phase_labels=['FC', 'PHIC'],
mr_phase_labels=['FC', 'PHIC'],
cleanup=True,
origin=None,
):
"""Phase error between native_pdb+native_mtz and mr_mtz
"""
if origin:
assert False
# if we are given an origin shift we can calculate the phase difference directly with cctbx
change_of_hand = False # hard-wired as we don't have this information
origin_shift = origin
native_object = _check_mtz(native_mtz_phased, labels=native_phase_labels)
mr_object = _check_mtz(mr_mtz, labels=mr_phase_labels)
native_fc_miller_array = _get_miller_array_from_label(native_object, native_phase_labels[0])
mr_fc_miller_array = _get_miller_array_from_label(mr_object, mr_phase_labels[0])
before_origin = native_fc_miller_array.mean_phase_error(mr_fc_miller_array)
basis_str = _basis_str(origin)
after_origin = native_fc_miller_array.mean_phase_error(mr_fc_miller_array.change_basis(basis_str))
else:
assert f_label and sigf_label, "Need f_label and sigf_label to be given!"
# we use cphasematch to calculate the phase difference and origin shift
merged_mtz, labels = merge_mtz(
native_mtz_phased, [f_label, sigf_label] + native_phase_labels, mr_mtz, mr_phase_labels
)
assert len(labels) == 6
before_origin, after_origin, change_of_hand, origin_shift = run_cphasematch(
merged_mtz, labels[0:2], native_phase_labels=labels[2:4], mr_phase_labels=labels[4:]
)
if cleanup:
os.unlink(merged_mtz)
return before_origin, after_origin, change_of_hand, origin_shift
[docs]def merge_mtz_cctbx(mtz1_path, mtz1_labels, mtz2_path, mtz2_labels):
"""TODO!
Create MTZ file with F from native_mtz and calculated phases from native_mtz and mr_mtz to enable phaser error calc by cphasematch"""
# Check mtz files have required columns
mtz1 = _check_mtz(mtz1_path, labels=mtz1_labels)
mtz2 = _check_mtz(mtz2_path, labels=mtz2_labels)
merged_object = iotbx.mtz.object()
merged_object.set_title("Calculated phases from {0} and {1}".format(mtz1_path, mtz2_path))
merged_object.add_history(line="Created by ample merged_mtz on: {0}".format(datetime.datetime.now()))
merged_object.add_history(line="Created from: {0} and {1}".format(mtz1_path, mtz2_path))
miller_array1 = _get_miller_array_from_label(mtz1, mtz1_labels[0])
unit_cell = miller_array1.unit_cell()
merged_object.set_space_group_info(miller_array1.space_group_info())
merged_object.set_hkl_base(unit_cell)
crystal = merged_object.add_crystal(
name="cphasematch_crystal", project_name="cphasematch_project", unit_cell=unit_cell
)
dataset = crystal.add_dataset(name="test_dataset", wavelength=1) # FIX
# The labels in add_miller_array automatically controlled by the label_decorator which adds 'PHI'
# as a prefix to the appropriate columns we therefore either need to make our own decorator or just stay with
# using the PHI prefix
# Loop through the labels in each mtz
labels = []
for i, mtz in enumerate([mtz1, mtz2]):
for label in [mtz1_labels, mtz2_labels][i]:
miller_array = _get_miller_array_from_label(mtz, label)
dataset.add_miller_array(miller_array=miller_array, column_root_label=label)
labels.append(label)
# as mtz dataset and then change labels?
# dataset.add_column(label=fcalc_label, type="F").set_values(values=mr_object.get_column(fc_label).extract_values())
# dataset.add_column(label=phicalc_label, type="P").set_values(values=mr_object.get_column(phic_label).extract_values())
# Write out the file
name1 = os.path.splitext(os.path.basename(mtz1_path))[0]
name2 = os.path.splitext(os.path.basename(mtz2_path))[0]
merged_mtz = "{0}_{1}.mtz".format(name1, name2)
merged_object.write(merged_mtz)
return os.path.abspath(merged_mtz), labels
[docs]def merge_mtz(mtz1_path, mtz1_labels, mtz2_path, mtz2_labels):
"""Create MTZ file with columns from the given mtz files and mtz labels in each file"""
# Can't have any duplicates in file labels
assert len(mtz1_labels) == len(set(mtz1_labels)), "Duplicate labels in mtz1_labels"
assert len(mtz2_labels) == len(set(mtz2_labels)), "Duplicate labels in mtz2_labels"
name1 = os.path.splitext(os.path.basename(mtz1_path))[0]
name2 = os.path.splitext(os.path.basename(mtz2_path))[0]
merged_mtz = os.path.abspath("{0}_{1}.mtz".format(name1, name2))
cmd = ['cad', 'hklin1', mtz1_path, 'hklin2', mtz2_path, 'hklout', merged_mtz]
# See if any labels are duplicate and need to be renamed
rename = [] # List of (File_number, file_label_idx, orig_label, renamed_label)
labels = []
for i, mtz in enumerate([mtz1_path, mtz2_path]):
for j, label in enumerate([mtz1_labels, mtz2_labels][i]):
if label in labels:
newlabel = label + str(i + 1)
rename.append((i + 1, j + 1, label, newlabel))
else:
newlabel = label
rename.append((i + 1, j + 1, label, None))
assert newlabel not in labels, "Too many duplicate label names: {0}".format(newlabel)
labels.append(newlabel)
# Build up the list of which labels to extract from which files
stdin = ""
last_fileno = None
for fileno, labelno, orig_label, rename_label in rename:
if fileno != last_fileno:
if last_fileno is not None:
stdin += '\n' # Need to terminate the line
stdin += "LABIN FILE {0}".format(fileno)
last_fileno = fileno
stdin += " E{0}={1}".format(labelno, orig_label)
stdin += '\n' # Need to terminate the line
# Do any renaming for duplicate labels
last_fileno = None
for i, (fileno, label_idx, orig_label, rename_label) in enumerate(rename):
if rename_label is not None:
if last_fileno != fileno:
stdin += 'LABOUT FILE_NUMBER {0}'.format(fileno)
if last_fileno is not None:
# for anything other then then first, we need to terminate this block
stdin += '\n'
last_fileno = fileno
if fileno == last_fileno:
stdin += ' E{0}={1}'.format(label_idx, rename_label)
if fileno is not None:
stdin += '\n' # Add last linebreak as we have added a rename clause
logfile = os.path.abspath("cad.log")
retcode = ample_util.run_command(cmd=cmd, stdin=stdin, logfile=logfile)
if retcode != 0:
raise RuntimeError("Error running command: {0}\nCheck logfile: {1}".format(" ".join(cmd), logfile))
else:
os.unlink(logfile)
return os.path.abspath(merged_mtz), labels
[docs]def place_native_pdb(native_pdb, native_mtz, f_label, sigf_label, cleanup=True):
"""Place native_pdb into data from native_mtz using phaser with f_label and sigf_label"""
# get crystal info from pdb
sym_obj = iotbx.pdb.pdb_input(file_name=native_pdb).crystal_symmetry()
hall_symbol = sym_obj.space_group().type().hall_symbol()
unit_cell = sym_obj.unit_cell().parameters()
molecular_weight = pdb_edit.molecular_weight(native_pdb)
# Get data from MTZ file
mtz_object = _check_mtz(native_mtz, labels=[f_label, sigf_label])
miller_indices = mtz_object.extract_miller_indices()
fp_obs = mtz_object.get_column(f_label).extract_values().as_double()
sigfp_obs = mtz_object.get_column(sigf_label).extract_values().as_double()
# Create root_name for all files from native + mr_mtz
name1 = os.path.splitext(os.path.basename(native_pdb))[0]
name2 = os.path.splitext(os.path.basename(native_mtz))[0]
root_name = "{0}_{1}".format(name1, name2)
# Run phaser to position native
phaser_input = phaser.InputMR_AUTO()
phaser_input.setSPAC_HALL(hall_symbol)
phaser_input.setCELL6(unit_cell)
phaser_input.setREFL_F_SIGF(miller_indices, fp_obs, sigfp_obs)
# Make sure labels same as native_mtz
phaser_input.setLABI_F_SIGF(f_label, sigf_label)
phaser_input.setROOT(root_name)
phaser_input.addENSE_PDB_ID("native", native_pdb, 0.0)
phaser_input.addCOMP_PROT_MW_NUM(molecular_weight, 1)
phaser_input.addSEAR_ENSE_NUM("native", 1)
phaser_input.setMUTE(True)
phaser_run = phaser.runMR_AUTO(phaser_input)
del phaser_input
if not phaser_run.Success():
raise RuntimeError("PHASER failed: {0} : {1}".format(phaser_run.ErrorName(), phaser_run.ErrorMessage()))
if phaser_run.foundSolutions():
native_placed_mtz = phaser_run.getTopMtzFile()
else:
raise RuntimeError("PHASER could not place native_pdb")
if cleanup:
os.unlink(phaser_run.getTopPdbFile())
return native_placed_mtz
[docs]def parse_cphasematch_log(logfile):
"""Return phase error before_origin& after_origin from cphasematch logfile"""
before_origin = None
after_origin = None
change_of_hand = False
origin_shift = False
with open(logfile, 'r') as f:
for line in f:
if line.startswith(' Mean phase error before origin fixing:'):
before_origin = float(line.split()[-1])
elif line.startswith(' Mean phase error after origin fixing:'):
after_origin = float(line.split()[-1])
elif line.startswith(' Change of hand'):
if line.strip().split()[4] == 'YES':
change_of_hand = True
elif line.startswith(' Change of origin:'):
# Change of origin: uvw = ( 0, 0, 0)
fields = line.strip().split()
x = float(fields[6][:-1]) # strip trailing comma
y = float(fields[7][:-1]) # strip trailing comma
z = float(fields[8][:-1]) # strip trailing bracket
origin_shift = [x, y, z]
return before_origin, after_origin, change_of_hand, origin_shift
[docs]def run_cphasematch(merged_mtz, f_sigf_labels, native_phase_labels, mr_phase_labels, resolution_bins=12, cleanup=True):
"""run cphasematch to get phase error"""
argd = {
'merged_mtz': merged_mtz,
'f_label': f_sigf_labels[0],
'sigf_label': f_sigf_labels[1],
'native_phase_label1': native_phase_labels[0],
'native_phase_label2': native_phase_labels[1],
'mr_phase_label1': mr_phase_labels[0],
'mr_phase_label2': mr_phase_labels[1],
'resolution_bins': resolution_bins,
}
stdin = """
mtzin {merged_mtz}
colin-fo /*/*/[{f_label},{sigf_label}]
colin-fc-1 /*/*/[{native_phase_label1},{native_phase_label2}]
colin-fc-2 /*/*/[{mr_phase_label1},{mr_phase_label2}]
resolution-bins {resolution_bins}
""".format(
**argd
)
logfile = os.path.abspath("cphasematch.log")
cmd = ['cphasematch', "-stdin"]
retcode = ample_util.run_command(cmd=cmd, stdin=stdin, logfile=logfile)
if retcode != 0:
raise RuntimeError("Error running command: {0}\nCheck logfile: {1}".format(" ".join(cmd), logfile))
before_origin, after_origin, change_of_hand, origin_shift = parse_cphasematch_log(logfile)
if cleanup:
os.unlink(logfile)
return before_origin, after_origin, change_of_hand, origin_shift
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
import argparse
parser = argparse.ArgumentParser(description='Run cphasematch on two mtz files', prefix_chars="-")
parser.add_argument('-native_mtz', help="Input native MTZ file", required=True)
parser.add_argument('-mr_mtz', help="Input MTZ file from Molecular Replacement", required=True)
parser.add_argument('-native_pdb', help="Input native PDB file", required=False)
parser.add_argument('-f_label', help="F label from native MTZ file", required=False)
parser.add_argument('-sigf_label', help="SIGF label from native MTZ file", required=False)
args = parser.parse_args()
# Extract F & SIGF from file if not give on command-line
if args.f_label and args.sigf_label:
f_label, sigf_label = args.f_label, args.sigf_label
else:
f_label, sigf_label, _ = mtz_util.get_labels(args.native_mtz)
logger.info("Using F, SIGF labels from mtz file: %s, %s", f_label, sigf_label)
if args.native_pdb:
before_origin, after_origin, change_of_hand, origin_shift = calc_phase_error_pdb(
args.native_pdb, args.native_mtz, args.mr_mtz, f_label, sigf_label
)
else:
before_origin, after_origin, change_of_hand, origin_shift = calc_phase_error_mtz(
args.native_mtz, args.mr_mtz, f_label, sigf_label
)
logging.info("Calculated phase error:", after_origin)
logging.info("Calculated origin shift:", origin_shift)