Source code for ample.util.sequence_util

#!/usr/bin/env ccp4-python
from collections import OrderedDict
import logging
import os

import iotbx.pdb

from ample.util import ample_util, exit_util

[docs]def sequence(pdbin): return _sequence(iotbx.pdb.pdb_input(pdbin).construct_hierarchy())
def _sequence(hierarchy): """Extract the sequence of residues from a pdb file.""" chain2data = _sequence_data(hierarchy) return dict((k, chain2data[k][0]) for k in chain2data.keys()) def _sequence1(hierarchy): """Return sequence of the first chain""" d = _sequence(hierarchy) return d[sorted(d.keys())[0]]
[docs]def sequence_data(pdbin): return _sequence_data(iotbx.pdb.pdb_input(pdbin).construct_hierarchy())
def _sequence_data(hierarchy): """Extract the sequence of residues and resseqs from a pdb file.""" chain2data = OrderedDict() for chain in hierarchy.models()[0].chains(): # only the first model if not chain.is_protein(): continue seq, resseq = chain_data(chain) chain2data[] = (seq, resseq) return chain2data
[docs]def chain_data(chain): seq = "" resseq = [] for residue in chain.conformers()[0].residues(): # Just look at the first conformer # See if any of the atoms are non-hetero - if so we add this residue if any([not atom.hetero for atom in residue.atoms()]): seq += ample_util.three2one[residue.resname] resseq.append(residue.resseq_as_int()) return seq, resseq
[docs]def chain_sequence(chain): if not chain.is_protein(): return None return chain_data(chain)[0]
[docs]class Sequence(object): """A class to handle a fasta file""" def __init__(self, fasta=None, pdb=None, canonicalise=False): """Initialise the object""" self.MAXWIDTH = 80 # maximum width of any line = None # identifier for this sequence object self.headers = [] # title lines self.sequences = [] # The fasta sequences (just AA) as a single string self.resseqs = [] # The fasta sequences (just AA) as a single string self.pdbs = [] # pdb files that sequences were derived from self.chains = [] # The chain that the sequence belongs too (if applicable) self.fasta_files = [] # The fasta files the sequence were read from if fasta: self.from_fasta(fasta, canonicalise=canonicalise) elif pdb: self.from_pdb(pdbin=pdb) return
[docs] def add_pdb_data(self, pdbin): """Add the resseq information from a pdb to ourselves when we already have the sequence information from a fasta file - such as from an alignment We assume that there will be gaps (-) in the sequence and the letters may be upper or lower case Currently this only supports adding data for single-chain pdbs """ if not os.path.isfile(pdbin): raise RuntimeError("Cannot find provided pdb file: {}".format(pdbin)) if len(self.headers) < 1 and len(self.sequences) < 1: raise ValueError("Require at least one one sequence.") fname = os.path.basename(pdbin) name, ext = os.path.splitext(fname) if ext != ".pdb": raise TypeError("PDB files must have extension .pdb") got = False for idx, h in enumerate(self.headers): n = h[1:].split('.')[0] if n == name: got = True break if not got: raise RuntimeError("Could not find matching pdb name for {0} in headers {1}".format(fname, self.headers)) seqd = sequence_data(pdbin) if len(seqd) > 1: raise RuntimeError("Currently only support adding data for single chain pdbs") chain = list(seqd.keys())[0] self.pdbs[idx] = fname self.chains[idx] = chain self.resseqs[idx] = [] sequence = seqd[chain][0] resseqs = seqd[chain][1] needle = 0 GAP = '-' for res1 in self.sequences[idx]: if res1 == GAP: self.resseqs[idx].append(None) else: assert res1.upper() == sequence[needle] self.resseqs[idx].append(resseqs[needle]) needle += 1 return True
[docs] def fasta_str(self, pdbname=False): if len(self.sequences) < 1: raise RuntimeError("No sequences have been read!") headers = [] for i, header in enumerate(self.headers): if pdbname: header = ">{0}".format(os.path.basename(self.pdbs[i])) headers.append(header) return self._fasta_str(headers, self.sequences)
def _fasta_str(self, headers, sequences): s = "" for i, seq in enumerate(sequences): s += headers[i] + os.linesep for chunk in range(0, len(seq), self.MAXWIDTH): s += seq[chunk : chunk + self.MAXWIDTH] + os.linesep s += os.linesep return s
[docs] def from_fasta(self, fasta_file, canonicalise=True, resseq=True): = os.path.splitext(os.path.basename(fasta_file))[0] with open(fasta_file, "r") as f: self._parse_fasta(f, fasta_file=fasta_file, canonicalise=canonicalise) if resseq: for i, seq in enumerate(self.sequences): if len(self.resseqs) >= i + 1 and self.resseqs[i] is None: self.resseqs[i] = [] for j in range(len(seq)): self.resseqs[i].append(j + 1)
[docs] def from_pdb(self, pdbin): pdbin_name = os.path.basename(pdbin) = os.path.splitext(pdbin_name)[0] chain2data = sequence_data(pdbin) if len(chain2data) < 1: raise RuntimeError("Could not read sequence from pdb: {0}".format(pdbin)) self.headers = [] self.sequences = [] self.pdbs = [] self.resseqs = [] self.fasta_files = [] for chain in sorted(chain2data.keys()): seq = chain2data[chain][0] resseq = chain2data[chain][1] self.headers.append(">From pdb: {0} chain={1} length={2}".format(pdbin_name, chain, len(seq))) self.sequences.append(seq) self.resseqs.append(resseq) self.pdbs.append(pdbin_name) self.chains.append(chain) self.fasta_files.append(None)
[docs] def canonicalise(self): """Reformat the fasta file Description ----------- Needed because Rosetta has problems reading fastas. For it to be read, it has to have no spaces in the sequence, a name that is 4 characters and an underscore (ABCD_), everything has to be uppercase, and there has to be a return carriage at the end - this has to be linux formatted as when I make a fasta in windows, it doesnt recognize the return carriage. Rosetta has a lot of problems with fastas so we put in this script to deal with it. """ aa = list("ACDEFGHIKLMNPQRSTVWY") for i, seq in enumerate(self.sequences): cs = "" for char in seq: char = char.upper() if char in aa: cs += char else: msg = "Non-standard amino acid in your sequence: '{}'. Please format to standard amino acids!" raise RuntimeError(msg.format(char)) self.sequences[i] = cs
[docs] def length(self, seq_no=0): return len(self.sequences[seq_no])
[docs] def mutate_residue(self, from_aa, res_seq, to_aa, seq_id=0): """Change residue type from_aa at position res_seq to be of type to_aa Note: res_seq positions start from 1 not zero! Parameters ---------- from_aa : str Single-letter amino acid res_seq : int Residue sequence number to_aa : str Single-letter amino acid seq_id : int The index of the sequence to operate on (counting from zero) """ seq = self.sequences[seq_id] assert seq[res_seq - 1] == from_aa, "Amino acid at position {0} is {1} not {2}".format( res_seq, seq[res_seq - 1], from_aa ) self.sequences[seq_id] = seq[: res_seq - 1] + to_aa + seq[res_seq:] return
[docs] def numSequences(self): return len(self.sequences)
def _parse_fasta(self, fasta, fasta_file=None, canonicalise=True): """Parse the fasta file int our data structures & check for consistency""" headers = [] sequences = [] sequence = "" header = None for line in fasta: line = line.strip().rstrip(os.linesep) if not line: continue if header is None: if line.startswith(">"): header = line continue else: raise RuntimeError("FASTA sequencs must be prefixed with a > character: {}".format(line)) if header and line.startswith(">"): headers.append(header) sequences.append(sequence) header = line sequence = "" else: sequence += line headers.append(header) sequences.append(sequence) logging.debug("Stripping whitespace characters from sequence") logging.debug("Stripping '*' character of end of sequence") for i in range(len(sequences)): seq = sequences[i].replace(" ", "") if seq.endswith('*'): seq = seq[:-1] sequences[i] = seq self.headers, self.sequences, self.resseqs, self.fasta_files, self.pdbs, self.chains = [], [], [], [], [], [] for h, s in zip(headers, sequences): self.headers.append(h) self.sequences.append(s) self.fasta_files.append(fasta_file) self.resseqs.append(None) self.pdbs.append(None) self.chains.append(None) if canonicalise: self.canonicalise()
[docs] def pirStr(self, seqNo=0): """Return a canonical MAXWIDTH PIR representation of the file as a line-separated string""" pirStr = [self.title + os.linesep + os.linesep] for chunk in range(0, len(self.sequances[seqNo]), self.MAXWIDTH): pirStr.append(self.sequences[seqNo][chunk : chunk + self.MAXWIDTH] + os.linesep) pirStr.append(os.linesep) return pirStr
[docs] def sequence(self, seq_no=0): return self.sequences[seq_no]
[docs] def toPir(self, input_fasta, output_pir=None): """Take a fasta file and output the corresponding PIR file""" self.parseFasta(input_fasta) if not output_pir: dir, ffile = os.path.split(input_fasta) fname = os.path.splitext(ffile)[0] output_pir = os.path.join(dir, fname + ".pir") with open(output_pir, "w") as pirout: for line in self.pirStr(): pirout.write(line) return
[docs] def write_fasta(self, fasta_file, pdbname=False): with open(fasta_file, 'w') as f: for s in self.fasta_str(pdbname=pdbname): f.write(s) return
def __add__(self, other): self.headers += other.headers self.sequences += other.sequences self.resseqs += other.resseqs self.pdbs += other.pdbs self.chains += other.chains self.fasta_files += other.fasta_files return self def __str__(self): return self.__repr__() + os.linesep + self.fasta_str()
[docs]def process_fasta(amoptd, canonicalise=False): # Check we can find the input fasta if not os.path.exists(str(amoptd['fasta'])): msg = 'Cannot find fasta file: {0}'.format(amoptd['fasta']) exit_util.exit_error(msg) # Reformat to what we need logging.debug('Parsing FASTA file') try: fp = Sequence(fasta=amoptd['fasta'], canonicalise=canonicalise) except Exception as e: msg = "Error parsing FASTA file: {0}\n\n{1}".format(amoptd['fasta'], e.message) exit_util.exit_error(msg) if fp.numSequences() != 1: msg = "ERROR! Fasta file {0} has > 1 sequence in it.".format(amoptd['fasta']) exit_util.exit_error(msg) # Length checks amoptd['fasta_length'] = fp.length()"Fasta is {0} amino acids long".format(amoptd['fasta_length'])) # Check we have a decent length if amoptd['fasta_length'] < 9: msg = "ERROR! Fasta is of length {0}. This is much too short!".format(amoptd['fasta_length']) exit_util.exit_error(msg) # Check we will be able to truncate at this level if (float(amoptd['fasta_length']) / 100) * float(amoptd['percent']) < 1: msg = "Cannot truncate a fasta sequence of length {0} with {1} percent intervals. Please select a larger interval.".format( amoptd['fasta_length'], amoptd['percent'] ) exit_util.exit_error(msg) # Check that the sequence doesn't have a his-tag in it if not amoptd['allow_his_tag']: his_tag = 'HHHHHH' i = fp.sequence().find(his_tag) l = fp.length() if (0 <= i <= 20) or (l - 20 <= i <= l): msg = 'The fasta sequence contains a his tag sequence {0} at position {1}. If you wish to use ample with this sequence, please use the \"-allow_his_tag True\" option'.format( his_tag, i ) exit_util.exit_error(msg) # Fasta is ok, so write out a canonical fasta in the work directory outfasta = os.path.join(amoptd['work_dir'], amoptd['name'] + '_.fasta') fp.write_fasta(outfasta) amoptd['fasta'] = outfasta amoptd['sequence'] = fp.sequence() return