Source code for ample.util.cphasematch

#!/usr/bin/env ccp4-python

import datetime
import logging
import os

import phaser
import iotbx.pdb
import iotbx.mtz

from ample.util import ample_util, mtz_util, pdb_edit

logger = logging.getLogger(__name__)


def _basis_str(origin_shift):
    """Return the string to get the sgtbx basis_op for the given origin shift"""
    bstr = ''
    for i, axis in enumerate(['x', 'y', 'z']):
        if origin_shift[i] == 0.0:
            s = axis
        else:
            s = '{0}{1:+F}'.format(axis, origin_shift[i])
        if i == 0:
            bstr = s
        else:
            bstr = bstr + ',' + s
    return bstr


def _check_mtz(mtz_file, labels=[]):
    """Return an iotbx.mtz object for the mtz_file making sure the file is valid and contains the given labels"""
    mtz = iotbx.mtz.object(file_name=mtz_file)
    if len(mtz.crystals()) > 2:
        raise RuntimeError("Cannot deal with multiple crystal in mtz")
    # # Assume the first crystal is always the base so we use the second
    if len(mtz.crystals()[1].datasets()) > 1:
        raise RuntimeError("Cannot deal with > 1 dataset in mtz")
    for label in labels:
        if not mtz.has_column(label):
            raise RuntimeError("Cannot find label: {0} in mtz file: {1}".format(label, mtz_file))
    return mtz


def _get_miller_array_from_label(mtz_object, column_label):
    """Assuming only a single crystal/dataset, return the miller array for the given column label"""
    # Get the miller arrays for the columns we're interested in as dicts
    miller_dict = mtz_object.as_miller_arrays_dict()
    # Keys are 3-tuples where last item is array label
    try:
        col_key = [k for k in miller_dict.keys() if k[2] == column_label][0]
    except IndexError:
        raise RuntimeError("Cannot find column label: {0}".format(column_label))
    assert col_key
    return miller_dict[col_key]


[docs]def calc_phase_error_pdb( native_pdb, native_mtz, mr_mtz, f_label, sigf_label, native_phase_labels=['FC', 'PHIC'], mr_phase_labels=['FC', 'PHIC'], cleanup=True, origin=None, ): """Phase error between native_pdb+native_mtz and mr_mtz """ assert f_label and sigf_label, "Need f_label and sigf_label to be given!" native_mtz_phased = place_native_pdb(native_pdb, native_mtz, f_label, sigf_label) return calc_phase_error_mtz( native_mtz_phased, mr_mtz, f_label, sigf_label, native_phase_labels=native_phase_labels, mr_phase_labels=mr_phase_labels, cleanup=cleanup, origin=origin, )
[docs]def calc_phase_error_mtz( native_mtz_phased, mr_mtz, f_label=None, sigf_label=None, native_phase_labels=['FC', 'PHIC'], mr_phase_labels=['FC', 'PHIC'], cleanup=True, origin=None, ): """Phase error between native_pdb+native_mtz and mr_mtz """ if origin: assert False # if we are given an origin shift we can calculate the phase difference directly with cctbx change_of_hand = False # hard-wired as we don't have this information origin_shift = origin native_object = _check_mtz(native_mtz_phased, labels=native_phase_labels) mr_object = _check_mtz(mr_mtz, labels=mr_phase_labels) native_fc_miller_array = _get_miller_array_from_label(native_object, native_phase_labels[0]) mr_fc_miller_array = _get_miller_array_from_label(mr_object, mr_phase_labels[0]) before_origin = native_fc_miller_array.mean_phase_error(mr_fc_miller_array) basis_str = _basis_str(origin) after_origin = native_fc_miller_array.mean_phase_error(mr_fc_miller_array.change_basis(basis_str)) else: assert f_label and sigf_label, "Need f_label and sigf_label to be given!" # we use cphasematch to calculate the phase difference and origin shift merged_mtz, labels = merge_mtz( native_mtz_phased, [f_label, sigf_label] + native_phase_labels, mr_mtz, mr_phase_labels ) assert len(labels) == 6 before_origin, after_origin, change_of_hand, origin_shift = run_cphasematch( merged_mtz, labels[0:2], native_phase_labels=labels[2:4], mr_phase_labels=labels[4:] ) if cleanup: os.unlink(merged_mtz) return before_origin, after_origin, change_of_hand, origin_shift
[docs]def merge_mtz_cctbx(mtz1_path, mtz1_labels, mtz2_path, mtz2_labels): """TODO! Create MTZ file with F from native_mtz and calculated phases from native_mtz and mr_mtz to enable phaser error calc by cphasematch""" # Check mtz files have required columns mtz1 = _check_mtz(mtz1_path, labels=mtz1_labels) mtz2 = _check_mtz(mtz2_path, labels=mtz2_labels) merged_object = iotbx.mtz.object() merged_object.set_title("Calculated phases from {0} and {1}".format(mtz1_path, mtz2_path)) merged_object.add_history(line="Created by ample merged_mtz on: {0}".format(datetime.datetime.now())) merged_object.add_history(line="Created from: {0} and {1}".format(mtz1_path, mtz2_path)) miller_array1 = _get_miller_array_from_label(mtz1, mtz1_labels[0]) unit_cell = miller_array1.unit_cell() merged_object.set_space_group_info(miller_array1.space_group_info()) merged_object.set_hkl_base(unit_cell) crystal = merged_object.add_crystal( name="cphasematch_crystal", project_name="cphasematch_project", unit_cell=unit_cell ) dataset = crystal.add_dataset(name="test_dataset", wavelength=1) # FIX # The labels in add_miller_array automatically controlled by the label_decorator which adds 'PHI' # as a prefix to the appropriate columns we therefore either need to make our own decorator or just stay with # using the PHI prefix # Loop through the labels in each mtz labels = [] for i, mtz in enumerate([mtz1, mtz2]): for label in [mtz1_labels, mtz2_labels][i]: miller_array = _get_miller_array_from_label(mtz, label) dataset.add_miller_array(miller_array=miller_array, column_root_label=label) labels.append(label) # as mtz dataset and then change labels? # dataset.add_column(label=fcalc_label, type="F").set_values(values=mr_object.get_column(fc_label).extract_values()) # dataset.add_column(label=phicalc_label, type="P").set_values(values=mr_object.get_column(phic_label).extract_values()) # Write out the file name1 = os.path.splitext(os.path.basename(mtz1_path))[0] name2 = os.path.splitext(os.path.basename(mtz2_path))[0] merged_mtz = "{0}_{1}.mtz".format(name1, name2) merged_object.write(merged_mtz) return os.path.abspath(merged_mtz), labels
[docs]def merge_mtz(mtz1_path, mtz1_labels, mtz2_path, mtz2_labels): """Create MTZ file with columns from the given mtz files and mtz labels in each file""" # Can't have any duplicates in file labels assert len(mtz1_labels) == len(set(mtz1_labels)), "Duplicate labels in mtz1_labels" assert len(mtz2_labels) == len(set(mtz2_labels)), "Duplicate labels in mtz2_labels" name1 = os.path.splitext(os.path.basename(mtz1_path))[0] name2 = os.path.splitext(os.path.basename(mtz2_path))[0] merged_mtz = os.path.abspath("{0}_{1}.mtz".format(name1, name2)) cmd = ['cad', 'hklin1', mtz1_path, 'hklin2', mtz2_path, 'hklout', merged_mtz] # See if any labels are duplicate and need to be renamed rename = [] # List of (File_number, file_label_idx, orig_label, renamed_label) labels = [] for i, mtz in enumerate([mtz1_path, mtz2_path]): for j, label in enumerate([mtz1_labels, mtz2_labels][i]): if label in labels: newlabel = label + str(i + 1) rename.append((i + 1, j + 1, label, newlabel)) else: newlabel = label rename.append((i + 1, j + 1, label, None)) assert newlabel not in labels, "Too many duplicate label names: {0}".format(newlabel) labels.append(newlabel) # Build up the list of which labels to extract from which files stdin = "" last_fileno = None for fileno, labelno, orig_label, rename_label in rename: if fileno != last_fileno: if last_fileno is not None: stdin += '\n' # Need to terminate the line stdin += "LABIN FILE {0}".format(fileno) last_fileno = fileno stdin += " E{0}={1}".format(labelno, orig_label) stdin += '\n' # Need to terminate the line # Do any renaming for duplicate labels last_fileno = None for i, (fileno, label_idx, orig_label, rename_label) in enumerate(rename): if rename_label is not None: if last_fileno != fileno: stdin += 'LABOUT FILE_NUMBER {0}'.format(fileno) if last_fileno is not None: # for anything other then then first, we need to terminate this block stdin += '\n' last_fileno = fileno if fileno == last_fileno: stdin += ' E{0}={1}'.format(label_idx, rename_label) if fileno is not None: stdin += '\n' # Add last linebreak as we have added a rename clause logfile = os.path.abspath("cad.log") retcode = ample_util.run_command(cmd=cmd, stdin=stdin, logfile=logfile) if retcode != 0: raise RuntimeError("Error running command: {0}\nCheck logfile: {1}".format(" ".join(cmd), logfile)) else: os.unlink(logfile) return os.path.abspath(merged_mtz), labels
[docs]def place_native_pdb(native_pdb, native_mtz, f_label, sigf_label, cleanup=True): """Place native_pdb into data from native_mtz using phaser with f_label and sigf_label""" # get crystal info from pdb sym_obj = iotbx.pdb.pdb_input(file_name=native_pdb).crystal_symmetry() hall_symbol = sym_obj.space_group().type().hall_symbol() unit_cell = sym_obj.unit_cell().parameters() molecular_weight = pdb_edit.molecular_weight(native_pdb) # Get data from MTZ file mtz_object = _check_mtz(native_mtz, labels=[f_label, sigf_label]) miller_indices = mtz_object.extract_miller_indices() fp_obs = mtz_object.get_column(f_label).extract_values().as_double() sigfp_obs = mtz_object.get_column(sigf_label).extract_values().as_double() # Create root_name for all files from native + mr_mtz name1 = os.path.splitext(os.path.basename(native_pdb))[0] name2 = os.path.splitext(os.path.basename(native_mtz))[0] root_name = "{0}_{1}".format(name1, name2) # Run phaser to position native phaser_input = phaser.InputMR_AUTO() phaser_input.setSPAC_HALL(hall_symbol) phaser_input.setCELL6(unit_cell) phaser_input.setREFL_F_SIGF(miller_indices, fp_obs, sigfp_obs) # Make sure labels same as native_mtz phaser_input.setLABI_F_SIGF(f_label, sigf_label) phaser_input.setROOT(root_name) phaser_input.addENSE_PDB_ID("native", native_pdb, 0.0) phaser_input.addCOMP_PROT_MW_NUM(molecular_weight, 1) phaser_input.addSEAR_ENSE_NUM("native", 1) phaser_input.setMUTE(True) phaser_run = phaser.runMR_AUTO(phaser_input) del phaser_input if not phaser_run.Success(): raise RuntimeError("PHASER failed: {0} : {1}".format(phaser_run.ErrorName(), phaser_run.ErrorMessage())) if phaser_run.foundSolutions(): native_placed_mtz = phaser_run.getTopMtzFile() else: raise RuntimeError("PHASER could not place native_pdb") if cleanup: os.unlink(phaser_run.getTopPdbFile()) return native_placed_mtz
[docs]def parse_cphasematch_log(logfile): """Return phase error before_origin& after_origin from cphasematch logfile""" before_origin = None after_origin = None change_of_hand = False origin_shift = False with open(logfile, 'r') as f: for line in f: if line.startswith(' Mean phase error before origin fixing:'): before_origin = float(line.split()[-1]) elif line.startswith(' Mean phase error after origin fixing:'): after_origin = float(line.split()[-1]) elif line.startswith(' Change of hand'): if line.strip().split()[4] == 'YES': change_of_hand = True elif line.startswith(' Change of origin:'): # Change of origin: uvw = ( 0, 0, 0) fields = line.strip().split() x = float(fields[6][:-1]) # strip trailing comma y = float(fields[7][:-1]) # strip trailing comma z = float(fields[8][:-1]) # strip trailing bracket origin_shift = [x, y, z] return before_origin, after_origin, change_of_hand, origin_shift
[docs]def run_cphasematch(merged_mtz, f_sigf_labels, native_phase_labels, mr_phase_labels, resolution_bins=12, cleanup=True): """run cphasematch to get phase error""" argd = { 'merged_mtz': merged_mtz, 'f_label': f_sigf_labels[0], 'sigf_label': f_sigf_labels[1], 'native_phase_label1': native_phase_labels[0], 'native_phase_label2': native_phase_labels[1], 'mr_phase_label1': mr_phase_labels[0], 'mr_phase_label2': mr_phase_labels[1], 'resolution_bins': resolution_bins, } stdin = """ mtzin {merged_mtz} colin-fo /*/*/[{f_label},{sigf_label}] colin-fc-1 /*/*/[{native_phase_label1},{native_phase_label2}] colin-fc-2 /*/*/[{mr_phase_label1},{mr_phase_label2}] resolution-bins {resolution_bins} """.format( **argd ) logfile = os.path.abspath("cphasematch.log") cmd = ['cphasematch', "-stdin"] retcode = ample_util.run_command(cmd=cmd, stdin=stdin, logfile=logfile) if retcode != 0: raise RuntimeError("Error running command: {0}\nCheck logfile: {1}".format(" ".join(cmd), logfile)) before_origin, after_origin, change_of_hand, origin_shift = parse_cphasematch_log(logfile) if cleanup: os.unlink(logfile) return before_origin, after_origin, change_of_hand, origin_shift
if __name__ == "__main__": logging.basicConfig(level=logging.INFO) import argparse parser = argparse.ArgumentParser(description='Run cphasematch on two mtz files', prefix_chars="-") parser.add_argument('-native_mtz', help="Input native MTZ file", required=True) parser.add_argument('-mr_mtz', help="Input MTZ file from Molecular Replacement", required=True) parser.add_argument('-native_pdb', help="Input native PDB file", required=False) parser.add_argument('-f_label', help="F label from native MTZ file", required=False) parser.add_argument('-sigf_label', help="SIGF label from native MTZ file", required=False) args = parser.parse_args() # Extract F & SIGF from file if not give on command-line if args.f_label and args.sigf_label: f_label, sigf_label = args.f_label, args.sigf_label else: f_label, sigf_label, _ = mtz_util.get_labels(args.native_mtz) logger.info("Using F, SIGF labels from mtz file: %s, %s", f_label, sigf_label) if args.native_pdb: before_origin, after_origin, change_of_hand, origin_shift = calc_phase_error_pdb( args.native_pdb, args.native_mtz, args.mr_mtz, f_label, sigf_label ) else: before_origin, after_origin, change_of_hand, origin_shift = calc_phase_error_mtz( args.native_mtz, args.mr_mtz, f_label, sigf_label ) logging.info("Calculated phase error:", after_origin) logging.info("Calculated origin shift:", origin_shift)