Source code for ample.util.process_models

__author__ = "Jens Thomas"

import glob
import logging
import os
import shutil
import tarfile
import zipfile

import iotbx.pdb

from ample.util import ample_util, exit_util, pdb_edit, sequence_util

logger = logging.getLogger(__name__)


[docs]class CheckModelsResult: def __init__(self): self.created_updated_models = False self.error = None self.homologs = False self.ensemble = False self.merged_chains = False self.models_dir = None self.num_structures = 0 self.num_models = 0 self.sequence = None @property def single_ensemble(self): return self.num_structures == 1 and self.num_models > 1 and self.ensemble def __str__(self): attrs = [k for k in self.__dict__.keys() if not k.startswith('_')] INDENT = " " out_str = "Class: {}\nData:\n".format(self.__class__.__name__) for a in sorted(attrs): out_str += INDENT + "{} : {}\n".format(a, self.__dict__[a]) return out_str
[docs]def extract_and_validate_models(amoptd): """Extract models given to AMPLE from arguments in the amoptd and validate that they are suitable Parameters ---------- amoptd : dict AMPLE options dictionary """ def is_pdb_file(filename): return any(map(lambda x: filename.endswith(x), pdb_suffixes)) def path_to_quark_alldecoy(pdb_files): QUARK_DECOY_NAME = 'alldecoy.pdb' for pdb in pdb_files: if os.path.basename(pdb) == QUARK_DECOY_NAME: return pdb return None models_arg = amoptd['models'] # command-line arg from user if models_arg is None: return if not os.path.exists(models_arg): raise RuntimeError("Cannot find models: {}".format(models_arg)) models_dir_final = amoptd['models_dir'] # the directory where the models need to end up if models_dir_final is None: models_dir_final = os.path.join(amoptd['work_dir'], 'models') logger.debug("Setting models_dir_final to: %s", models_dir_final) amoptd['models_dir'] = models_dir_final pdb_suffixes = ['.pdb', '.PDB'] quark_models = False num_quark_models = 0 if os.path.isfile(models_arg): filepath = models_arg models_dir_tmp = os.path.join(amoptd['work_dir'], 'models.tmp') if not os.path.isdir(models_dir_tmp): os.mkdir(models_dir_tmp) # Extract /copy any pdb files into models_dir_tmp if tarfile.is_tarfile(filepath): pdb_files = ample_util.extract_tar(filepath, models_dir_tmp, suffixes=pdb_suffixes) elif zipfile.is_zipfile(filepath): pdb_files = ample_util.extract_zip(filepath, models_dir_tmp, suffixes=pdb_suffixes) elif is_pdb_file(filepath): shutil.copy2(filepath, models_dir_tmp) pdb_files = [os.path.join(models_dir_tmp, filepath)] else: raise RuntimeError("Do not know how to handle input models file: {}".format(filepath)) # See if we have an alldecoy.pdb file quark_models = path_to_quark_alldecoy(pdb_files) if quark_models: # Quark decoys are processed by us so go straight into final directory without checking num_quark_models = split_quark_alldecoy(quark_models, models_dir_final) amoptd['quark_models'] = True results = CheckModelsResult() # Null result - we extracted the models so assume are ok results.models_dir = models_dir_final results.num_models = num_quark_models shutil.rmtree(models_dir_tmp) # delete as contains uneeded files extracted from archive elif os.path.isdir(models_arg): models_dir_tmp = models_arg elif isinstance(models_arg, list): # Assume all models are in the same directory models_dir_tmp = os.path.dirname(models_arg[0]) else: raise RuntimeError("Dosdn't know how to process -models flag: {}".format(models_arg)) if quark_models: # Null result - we extracted the models so assume are ok results = CheckModelsResult() results.num_structures = num_quark_models results.num_models = num_quark_models results.models_dir = models_dir_final else: results = check_models_dir(models_dir_tmp, models_dir_final) amoptd['models_dir'] = results.models_dir amoptd['processed_models'] = glob.glob(os.path.join(results.models_dir, "*.pdb")) return results
[docs]def handle_model_import(amoptd, results): """Handle any errors flagged up by importing models and set any options based on the type of models.""" error_msg = None if results.error: error_msg = "Error importing models: {}".format(results.error) elif results.homologs and not amoptd['homologs']: error_msg = "Imported models were not sequence identical, but homologs mode wasn't selected" if error_msg: exit_util.exit_error(error_msg) if results.single_ensemble and amoptd['webserver_uri']: logger.info("** Webserver mode got single NMR model so turning on NMR mode **") amoptd['nmr_model_in'] = amoptd['processed_models'][0] elif results.homologs and amoptd['webserver_uri']: logger.info("** Webserver mode got a directory of homologs so turning on Homolog mode **") amoptd['homologs'] = True
[docs]def check_models_dir(models_in_dir, models_out_dir): """Examine a directory of PDB files to determine their suitability for running with AMPLE.""" assert os.path.isdir(models_in_dir) pdb_structures = glob.glob(os.path.join(models_in_dir, "*.[pP][dD][bB]")) results = CheckModelsResult() results.models_dir = models_out_dir results = check_models(pdb_structures, results) if not results.created_updated_models: # if the models were ok, we can just use as is results.models_dir = models_in_dir logger.info("Using models from directory: %s" % results.models_dir) return results
[docs]def check_models(pdb_structures, results): """Examine PDB files to determine their suitablilty for running with AMPLE.""" assert len(pdb_structures) > 0 assert results.models_dir is not None updated = False hierarchies = [] ref_data = None num_structures = len(pdb_structures) if num_structures == 1: pdb = pdb_structures[0] logger.info("Processing a single input PDB file: {}".format(pdb)) hierarchy = iotbx.pdb.pdb_input(pdb).construct_hierarchy() num_models = len(hierarchy.models()) results.num_models = num_models if num_models > 1: if not single_chain_models_with_same_sequence(hierarchy): results.error = "Supplied with a single PDB file that contained multiple models with multiple or unmatching chains: {}".format( pdb ) return results logger.debug("Found a single PDB with multiple models - assuming an NMR ensemble") results.ensemble = True results.sequence = sequence_util.chain_sequence(hierarchy.models()[0].only_chain()) else: logger.info("check_models found a single pdb with a single model") if not len(hierarchy.only_model().chains()) == 1: results.error = "Supplied with a single PDB file that contained a single model with multiple chains: {}".format( pdb ) return results updated = pdb_edit.add_missing_single_chain_ids(hierarchy) if updated: logger.critical("Added missing chain IDs to models") hierarchies.append(hierarchy) else: # multiple pdb structures - get a list of chain_ids and sequences for all members multiple = [False] * num_structures # tracks if there are multiple chains in the models chains_match_across_pdbs = [ False ] * num_structures # do chains with the same index have the same sequence across pdbs? logger.info("Processing {} input PDB files".format(num_structures)) for idx_pdb, pdb in enumerate(pdb_structures): h = iotbx.pdb.pdb_input(pdb).construct_hierarchy() if len(h.models()) > 1: # Assume an NMR ensemble so just extract the first member as representative of all logger.critical( "Multiple models in pdb {}. Assuming NMR ensemble so extracting first model".format(pdb) ) h = iotbx.pdb.hierarchy.root() h.append_model((h.models()[0].detached_copy())) updated = True hierarchies.append(h) seq_data = sequence_util._sequence_data(h) if ref_data is None: # Get sequence data for first pdb ref_data = seq_data if len(ref_data) > 1: multiple[idx_pdb] = True continue else: check_sequences_match(ref_data, seq_data, idx_pdb, chains_match_across_pdbs, multiple) # We now have a list of single-model structures results.num_models = num_structures if any(multiple): logger.debug("Processing multichain PDBs") if sum(multiple) != len(multiple): results.error = "check_models: given multiple structures, but only some had multiple chains: need all chains to correspond" return results if sum(chains_match_across_pdbs) != len(chains_match_across_pdbs): results.error = "check_models: given multiple structures with multiple chains, but chains did not match across pdbs: need all chains to correspond" return results # merge all chains for i, h in enumerate(hierarchies): hierarchies[i] = pdb_edit._merge_chains(h) results.merged_chains = True logger.debug("Multi-chain PDB files were merged to a single chain.") updated = True else: # multiple single-chain pdbs - are they homologs? logger.debug("Processing single-chain PDBs") if sum(chains_match_across_pdbs) != len(chains_match_across_pdbs): logger.debug("Chains don't match across pdbs so assuming homologs") results.homologs = True else: # All have the same sequence results.sequence = sequence_util.chain_sequence(hierarchies[0].only_model().only_chain()) # We now have multiple structures, each with one chain - make sure they all have a chain.id chains_updated = pdb_edit.add_missing_single_chain_ids(hierarchies) if chains_updated: logger.info("Added missing chain IDs to models") updated = chains_updated | updated # see if anything has been updated results.num_structures = num_structures if updated: logger.warning("Updated models have been created to ensure their suitability for running with AMPLE.") # write out new files results.created_updated_models = True if not os.path.isdir(results.models_dir): os.mkdir(results.models_dir) for pdb, h in zip(pdb_structures, hierarchies): basename = os.path.basename(pdb) pdbout = os.path.join(results.models_dir, basename) with open(pdbout, 'w') as f: f.write("REMARK Original file:{}\n".format(pdb)) f.write(h.as_pdb_string(anisou=False)) return results
[docs]def check_sequences_match(ref_data, seq_data, idx_pdb, chains_match_across_pdbs, multiple): """Check whether chains with the same index across multiple pdbs have matching sequences.""" all_chains_match = 0 for i, (rd, sd) in enumerate(zip(ref_data, seq_data)): ref_seq = ref_data[rd][0] seq = seq_data[sd][0] if ref_seq == seq: all_chains_match += 1 if all_chains_match == len(ref_data): chains_match_across_pdbs[idx_pdb] = True if idx_pdb == 1: # if the second chain matches the first then the first matches chains_match_across_pdbs[0] = True if i > 0: multiple[idx_pdb] = True
[docs]def single_chain_models_with_same_sequence(hierarchy): """Return True if hierarchy contains sequence-identical single-chain models.""" root_seq = None for model in hierarchy.models(): try: chain = model.only_chain() except AssertionError: return False seq = sequence_util.chain_sequence(chain) if root_seq is None: root_seq = seq continue if seq != root_seq: return False return True
[docs]def split_quark_alldecoy(alldecoy, directory): """Split a single QUARK PDB with multiple models into individual PDB files Parameters ---------- alldecoy : str Single QUARK PDB file with multiple model entries directory : str Directory to extract the PDB files to Returns ------- extracted_models : list List of PDB files for all models """ logger.info("Extracting decoys from: %s into %s", alldecoy, directory) if not os.path.isdir(directory): os.mkdir(directory) smodels = [] with open(alldecoy, 'r') as f: m = [] for line in f: if line.startswith("ENDMDL"): m.append(line) smodels.append(m) m = [] else: m.append(line) if not len(smodels): raise RuntimeError("Could not extract any models from: {0}".format(alldecoy)) for i, m in enumerate(smodels): fpath = os.path.join(directory, "quark_{0}.pdb".format(i)) with open(fpath, 'w') as f: for line in m: # Reconstruct something sensible as from the coordinates on it's all quark-specific # and there is no chain ID if line.startswith("ATOM"): line = line[:21] + 'A' + line[22:54] + " 1.00 0.00 \n" f.write(line) logger.debug("Wrote: %s", fpath) num_models = i + 1 logger.info("Extracted %d models from quark archive" % num_models) return num_models