#!/usr/bin/env ccp4-python
import datetime
import logging
import os
import phaser
import iotbx.pdb
import iotbx.mtz
from ample.util import pdb_edit
from ample.util import ample_util
from ample.util import mtz_util
_logger = logging.getLogger(__name__)
def _basis_str(origin_shift):
"""Return the string to get the sgtbx basis_op for the given origin shift"""
bstr = ''
for i, axis in enumerate(['x', 'y', 'z']):
if origin_shift[i] == 0.0:
s = axis
else:
s = '{0}{1:+F}'.format(axis,origin_shift[i])
if i == 0:
bstr = s
else:
bstr = bstr + ',' +s
return bstr
def _check_mtz(mtz_file, labels=[]):
"""Return an iotbx.mtz object for the mtz_file making sure the file is valid and contains the given labels"""
mtz = iotbx.mtz.object(file_name=mtz_file)
if len(mtz.crystals()) > 2: raise RuntimeError("Cannot deal with multiple crystal in mtz")
# # Assume the first crystal is always the base so we use the second
if len(mtz.crystals()[1].datasets()) > 1: raise RuntimeError("Cannot deal with > 1 dataset in mtz")
for label in labels:
if not mtz.has_column(label):
raise RuntimeError("Cannot find label: {0} in mtz file: {1}".format(label, mtz_file))
return mtz
def _get_miller_array_from_label(mtz_object, column_label):
"""Assuming only a single crystal/dataset, return the miller array for the given column label"""
# Get the miller arrays for the columns we're interested in as dicts
miller_dict = mtz_object.as_miller_arrays_dict()
# Keys are 3-tuples where last item is array label
try:
col_key = [k for k in miller_dict.keys() if k[2] == column_label][0]
except IndexError:
raise RuntimeError("Cannot find column label: {0}".format(column_label))
assert col_key
return miller_dict[col_key]
[docs]def calc_phase_error_pdb(native_pdb, native_mtz, mr_mtz, f_label, sigf_label, native_phase_labels=['FC', 'PHIC'], mr_phase_labels=['FC', 'PHIC'], cleanup=True, origin=None):
"""Phase error between native_pdb+native_mtz and mr_mtz
"""
assert f_label and sigf_label,"Need f_label and sigf_label to be given!"
native_mtz_phased = place_native_pdb(native_pdb, native_mtz, f_label, sigf_label)
return calc_phase_error_mtz(native_mtz_phased,
mr_mtz,
f_label,
sigf_label,
native_phase_labels=native_phase_labels,
mr_phase_labels=mr_phase_labels,
cleanup=cleanup,
origin=origin)
[docs]def calc_phase_error_mtz(native_mtz_phased, mr_mtz, f_label=None, sigf_label=None, native_phase_labels=['FC', 'PHIC'], mr_phase_labels=['FC', 'PHIC'], cleanup=True, origin=None):
"""Phase error between native_pdb+native_mtz and mr_mtz
"""
if origin:
assert False
# if we are given an origin shift we can calculate the phase difference directly with cctbx
change_of_hand = False # hard-wired as we don't have this information
origin_shift = origin
native_object = _check_mtz(native_mtz_phased, labels=native_phase_labels)
mr_object = _check_mtz(mr_mtz, labels = mr_phase_labels)
native_fc_miller_array = _get_miller_array_from_label(native_object, native_phase_labels[0])
mr_fc_miller_array = _get_miller_array_from_label(mr_object, mr_phase_labels[0])
before_origin = native_fc_miller_array.mean_phase_error(mr_fc_miller_array)
basis_str = _basis_str(origin)
after_origin = native_fc_miller_array.mean_phase_error(mr_fc_miller_array.change_basis(basis_str))
else:
assert f_label and sigf_label,"Need f_label and sigf_label to be given!"
# we use cphasematch to calculate the phase difference and origin shift
merged_mtz, labels = merge_mtz(native_mtz_phased, [f_label, sigf_label] + native_phase_labels, mr_mtz, mr_phase_labels)
assert len(labels)==6
before_origin, after_origin, change_of_hand, origin_shift = run_cphasematch(merged_mtz,
labels[0:2],
native_phase_labels=labels[2:4],
mr_phase_labels=labels[4:])
if cleanup: os.unlink(merged_mtz)
return before_origin, after_origin, change_of_hand, origin_shift
[docs]def merge_mtz_cctbx(mtz1_path, mtz1_labels, mtz2_path, mtz2_labels):
"""TODO!
Create MTZ file with F from native_mtz and calculated phases from native_mtz and mr_mtz to enable phaser error calc by cphasematch"""
# Check mtz files have required columns
mtz1 = _check_mtz(mtz1_path, labels = mtz1_labels)
mtz2 = _check_mtz(mtz2_path, labels = mtz2_labels)
merged_object = iotbx.mtz.object()
merged_object.set_title("Calculated phases from {0} and {1}".format(mtz1_path, mtz2_path))
merged_object.add_history(line="Created by ample merged_mtz on: {0}".format(datetime.datetime.now()))
merged_object.add_history(line="Created from: {0} and {1}".format(mtz1_path, mtz2_path))
miller_array1 = _get_miller_array_from_label(mtz1, mtz1_labels[0])
unit_cell = miller_array1.unit_cell()
merged_object.set_space_group_info(miller_array1.space_group_info())
merged_object.set_hkl_base(unit_cell)
crystal = merged_object.add_crystal(
name="cphasematch_crystal",
project_name="cphasematch_project",
unit_cell = unit_cell
)
dataset = crystal.add_dataset(name = "test_dataset",
wavelength=1) # FIX
# The labels in add_miller_array automatically controlled by the label_decorator which adds 'PHI'
# as a prefix to the appropriate columns we therefore either need to make our own decorator or just stay with
# using the PHI prefix
# Loop through the labels in each mtz
labels = []
for i, mtz in enumerate([mtz1, mtz2]):
for label in [mtz1_labels, mtz2_labels][i]:
miller_array = _get_miller_array_from_label(mtz, label)
dataset.add_miller_array(
miller_array=miller_array,
column_root_label=label,
)
labels.append(label)
# as mtz dataset and then change labels?
#dataset.add_column(label=fcalc_label, type="F").set_values(values=mr_object.get_column(fc_label).extract_values())
#dataset.add_column(label=phicalc_label, type="P").set_values(values=mr_object.get_column(phic_label).extract_values())
# Write out the file
name1 = os.path.splitext(os.path.basename(mtz1_path))[0]
name2 = os.path.splitext(os.path.basename(mtz2_path))[0]
merged_mtz = "{0}_{1}.mtz".format(name1, name2)
merged_object.write(merged_mtz)
return os.path.abspath(merged_mtz), labels
[docs]def merge_mtz(mtz1_path, mtz1_labels, mtz2_path, mtz2_labels):
"""Create MTZ file with columns from the given mtz files and mtz labels in each file"""
# Can't have any duplicates in file labels
assert len(mtz1_labels) == len(set(mtz1_labels)),"Duplicate labels in mtz1_labels"
assert len(mtz2_labels) == len(set(mtz2_labels)),"Duplicate labels in mtz2_labels"
name1 = os.path.splitext(os.path.basename(mtz1_path))[0]
name2 = os.path.splitext(os.path.basename(mtz2_path))[0]
merged_mtz = os.path.abspath("{0}_{1}.mtz".format(name1, name2))
cmd = [ 'cad', 'hklin1', mtz1_path, 'hklin2', mtz2_path, 'hklout', merged_mtz ]
# See if any labels are duplicate and need to be renamed
rename = [] # List of (File_number, file_label_idx, orig_label, renamed_label)
labels = []
for i, mtz in enumerate([mtz1_path, mtz2_path]):
for j, label in enumerate([mtz1_labels, mtz2_labels][i]):
if label in labels:
newlabel = label + str(i+1)
rename.append((i+1,j+1, label, newlabel))
else:
newlabel = label
rename.append((i+1,j+1, label, None))
assert newlabel not in labels, "Too many duplicate label names: {0}".format(newlabel)
labels.append(newlabel)
# Build up the list of which labels to extract from which files
stdin = ""
last_fileno = None
for fileno, labelno, orig_label, rename_label in rename:
if fileno != last_fileno:
if last_fileno is not None:
stdin += '\n' # Need to terminate the line
stdin += "LABIN FILE {0}".format(fileno)
last_fileno = fileno
stdin += " E{0}={1}".format(labelno, orig_label)
stdin += '\n' # Need to terminate the line
# Do any renaming for duplicate labels
last_fileno = None
for i, (fileno, label_idx, orig_label, rename_label) in enumerate(rename):
if rename_label is not None:
if last_fileno != fileno:
stdin += 'LABOUT FILE_NUMBER {0}'.format(fileno)
if last_fileno is not None:
# for anything other then then first, we need to terminate this block
stdin += '\n'
last_fileno = fileno
if fileno == last_fileno:
stdin += ' E{0}={1}'.format(label_idx,rename_label)
if fileno is not None:
stdin += '\n' # Add last linebreak as we have added a rename clause
logfile = os.path.abspath("cad.log")
retcode = ample_util.run_command(cmd=cmd, stdin=stdin, logfile=logfile)
if retcode != 0: raise RuntimeError, "Error running command: {0}\nCheck logfile: {1}".format( " ".join(cmd), logfile )
return os.path.abspath(merged_mtz), labels
[docs]def place_native_pdb(native_pdb, native_mtz, f_label, sigf_label, cleanup=True):
"""Place native_pdb into data from native_mtz using phaser with f_label and sigf_label"""
# get crystal info from pdb
sym_obj = iotbx.pdb.pdb_input(file_name=native_pdb).crystal_symmetry()
hall_symbol = sym_obj.space_group().type().hall_symbol()
unit_cell = sym_obj.unit_cell().parameters()
molecular_weight = pdb_edit.molecular_weight(native_pdb)
# Get data from MTZ file
mtz_object = _check_mtz(native_mtz, labels = [f_label, sigf_label])
miller_indices = mtz_object.extract_miller_indices()
fp_obs = mtz_object.get_column(f_label).extract_values().as_double()
sigfp_obs = mtz_object.get_column(sigf_label).extract_values().as_double()
# Create root_name for all files from native + mr_mtz
name1 = os.path.splitext(os.path.basename(native_pdb))[0]
name2 = os.path.splitext(os.path.basename(native_mtz))[0]
root_name = "{0}_{1}".format(name1, name2)
# Run phaser to position native
phaser_input = phaser.InputMR_AUTO()
phaser_input.setSPAC_HALL(hall_symbol)
phaser_input.setCELL6(unit_cell)
phaser_input.setREFL_F_SIGF(miller_indices, fp_obs, sigfp_obs)
# Make sure labels same as native_mtz
phaser_input.setLABI_F_SIGF(f_label,sigf_label)
phaser_input.setROOT(root_name)
phaser_input.addENSE_PDB_ID("native", native_pdb, 0.0)
phaser_input.addCOMP_PROT_MW_NUM(molecular_weight, 1)
phaser_input.addSEAR_ENSE_NUM("native", 1)
phaser_input.setMUTE(True)
phaser_run = phaser.runMR_AUTO(phaser_input)
del phaser_input
if not phaser_run.Success():
raise RuntimeError("PHASER failed: {0} : {1}".format(phaser_run.ErrorName(), phaser_run.ErrorMessage()))
if phaser_run.foundSolutions():
#print "Phaser has found MR solutions"
#print "Top LLG = %f" % phaser_run.getTopLLG()
#print "Top PDB file = %s" % phaser_run.getTopPdbFile()
#print "Top MTZ file = %s" % phaser_run.getTopMtzFile()
native_placed_mtz = phaser_run.getTopMtzFile()
else:
raise RuntimeError("PHASER could not place native_pdb")
if cleanup:
os.unlink(phaser_run.getTopPdbFile())
return native_placed_mtz
[docs]def parse_cphasematch_log(logfile):
"""Return phase error before_origin& after_origin from cphasematch logfile"""
before_origin = None
after_origin = None
change_of_hand = False
origin_shift = False
with open(logfile, 'r') as f:
for line in f:
if line.startswith(' Mean phase error before origin fixing:'):
before_origin = float(line.split()[-1])
elif line.startswith(' Mean phase error after origin fixing:'):
after_origin = float(line.split()[-1])
elif line.startswith(' Change of hand'):
if line.strip().split()[4] == 'YES':
change_of_hand = True
elif line.startswith(' Change of origin:'):
# Change of origin: uvw = ( 0, 0, 0)
fields = line.strip().split()
x = float(fields[6][:-1]) # strip trailing comma
y = float(fields[7][:-1]) # strip trailing comma
z = float(fields[8][:-1]) # strip trailing bracket
origin_shift = [x,y,z]
return before_origin, after_origin, change_of_hand, origin_shift
[docs]def run_cphasematch(merged_mtz,
f_sigf_labels,
native_phase_labels,
mr_phase_labels,
resolution_bins=12,
cleanup=True):
"""run cphasematch to get phase error"""
argd = { 'merged_mtz' : merged_mtz,
'f_label' : f_sigf_labels[0],
'sigf_label' : f_sigf_labels[1],
'native_phase_label1' : native_phase_labels[0],
'native_phase_label2' : native_phase_labels[1],
'mr_phase_label1' : mr_phase_labels[0],
'mr_phase_label2' : mr_phase_labels[1],
'resolution_bins' : resolution_bins }
stdin = """
mtzin {merged_mtz}
colin-fo /*/*/[{f_label},{sigf_label}]
colin-fc-1 /*/*/[{native_phase_label1},{native_phase_label2}]
colin-fc-2 /*/*/[{mr_phase_label1},{mr_phase_label2}]
resolution-bins {resolution_bins}
""".format(**argd)
logfile = os.path.abspath("cphasematch.log")
cmd= [ 'cphasematch',
"-stdin" ]
retcode = ample_util.run_command(cmd=cmd, stdin=stdin, logfile=logfile)
if retcode != 0: raise RuntimeError, "Error running command: {0}\nCheck logfile: {1}".format( " ".join(cmd), logfile )
before_origin, after_origin, change_of_hand, origin_shift = parse_cphasematch_log(logfile)
if cleanup:os.unlink(logfile)
return before_origin, after_origin, change_of_hand, origin_shift
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
import argparse
parser = argparse.ArgumentParser(description='Run cphasematch on two mtz files', prefix_chars="-")
#group = parser.add_argument_group()
parser.add_argument('-native_mtz',
help="Input native MTZ file",
required=True)
parser.add_argument('-mr_mtz',
help="Input MTZ file from Molecular Replacement",
required=True)
parser.add_argument('-native_pdb',
help="Input native PDB file",
required=False)
parser.add_argument('-f_label',
help="F label from native MTZ file",
required=False)
parser.add_argument('-sigf_label',
help="SIGF label from native MTZ file",
required=False)
args = parser.parse_args()
# Extract F & SIGF from file if not give on command-line
if args.f_label and args.sigf_label:
f_label, sigf_label = args.f_label, args.sigf_label
else:
f_label, sigf_label, _ = mtz_util.get_labels(args.native_mtz)
_logger.info("Using F, SIGF labels from mtz file: {0}, {1} ".format(f_label, sigf_label))
if args.native_pdb:
before_origin, after_origin, change_of_hand, origin_shift = calc_phase_error_pdb(args.native_pdb,
args.native_mtz,
args.mr_mtz,
f_label,
sigf_label)
else:
before_origin, after_origin, change_of_hand, origin_shift = calc_phase_error_mtz(args.native_mtz,
args.mr_mtz,
f_label,
sigf_label)
print "Calculated phase error: {0}".format(after_origin)
print "Calculated origin shift: {0}".format(origin_shift)