#!/usr/bin/env python
"""
run_csymmatch
# Gives pdb with oriented model
create copy of csymmatch pdb with residue numbering matching native and rename chain to X
concatenate the native and reformatted csymmatch pdb
run ncont to generate contacts
parse ncont file to generate map & analyse whether placed bits match and what type of structure they are
"""
import functools
from operator import itemgetter
import os
import types
from ample.parsers import dssp_parser
from ample.util import ample_util, csymmatch, pdb_edit
[docs]class RioData(object):
def __init__(self):
# Hack to transfer data
self.joinedPdb = None
self.fromChains = None
self.toChains = None
self.origin = None
self.originPdb = None
self.numContacts = None # Used by both as parseNcontLog sets it
self.aaNumContacts = 0
self.rioNumContacts = 0
self.rioInRegister = 0
self.rioOoRegister = 0
self.rioBackwards = 0
self.numGood = 0
self.contacts = None
self.ncontLog = None
self.helix = None
self.pdb = None
self.csymmatchPdb = None
return
def __str__(self):
"""List the data attributes of this object"""
me = {}
for slot in dir(self):
attr = getattr(self, slot)
if not slot.startswith("__") and not (
isinstance(attr, types.MethodType) or isinstance(attr, types.FunctionType)
):
me[slot] = attr
s = "{0}\n".format(self.__repr__())
for k, v in me.iteritems():
s += "{0} : {1}\n".format(k, v)
return s
[docs]class Rio(object):
"""Foo
"""
def __init__(self):
self.ncontLog = None
self.numContacts = 0
self.contacts = None
self.inregister = 0
self.ooregister = 0
self.backwards = 0
self.best = None
return
[docs] def analyseRio(self, contactData, dsspP=None):
# Clear any data
contactData.rioInRegister = 0
contactData.rioOoRegister = 0
contactData.rioBackwards = 0
if contactData.numContacts == 0:
return
# Now categorise the chunks
chunks = self.findChunks(contacts=contactData.contacts, minContig=3)
if not chunks:
return 0
# for ( start, stop ) in startstop:
for d in chunks:
start = d['startIdx']
stop = d['stopIdx']
register = True
backwards = False
lastResSeq1 = None
lastResSeq2 = None
count = 1
for i in range(start, stop + 1):
c = contactData.contacts[i]
if i != start:
if c['resSeq1'] == lastResSeq1 - 1 or c['resSeq2'] == lastResSeq2 - 1:
backwards = True
# Sanity check
if backwards:
assert (
c['resSeq1'] == lastResSeq1 - 1 or c['resSeq2'] == lastResSeq2 - 1
), "Going backwards but resSeq arent!"
if not register and not backwards:
assert c['resSeq1'] != c['resSeq2'], "not register and backwards and not matching"
count += 1
lastResSeq1 = c['resSeq1']
lastResSeq2 = c['resSeq2']
if c['resSeq1'] != c['resSeq2']:
register = False
# Add total here
if register:
contactData.rioInRegister += count
else:
contactData.rioOoRegister += count
if backwards:
contactData.rioBackwards += count
# End categorising chunks
return
[docs] def calcAllAtom(self, contactData):
"""Calculate the All Atom data using the origin and data saved previously"""
# Clear out any set data
contactData.numContacts = 0
contactData.aaNumContacts = 0
contactData.contacts = None
# values from previous run to calc origin
self.runNcont(
pdbin=contactData.joinedPdb,
sourceChains=contactData.fromChains,
targetChains=contactData.toChains,
allAtom=True,
maxDist=0.5,
)
self.parseNcontLog(contactData)
contactData.aaNumContacts = contactData.numContacts
return contactData
[docs] def calcRio(self, contactData):
"""Calculate the RIO using the origin and data saved previously"""
# Clear out any set data
# contactData.numGood = 0
contactData.rioNumContacts = 0
contactData.rioInRegister = 0
contactData.rioOoRegister = 0
contactData.rioBackwards = 0
contactData.contacts = None
# values from previous run to calc origin
self.runNcont(
pdbin=contactData.joinedPdb, sourceChains=contactData.fromChains, targetChains=contactData.toChains
)
self.parseNcontLog(contactData)
self.analyseRio(contactData)
contactData.rioNumContacts = contactData.numContacts
return contactData
[docs] def findOrigin(
self, nativePdbInfo=None, mrPdbInfo=None, resSeqMap=None, origins=None, allAtom=False, workdir=os.getcwd()
):
"""Find the origin using the maximum number of contacts as metric"""
self.workdir = workdir
if not resSeqMap.resSeqMatch():
# We need to create a copy of the placed pdb with numbering matching the native
mrPdbRes = ample_util.filename_append(filename=mrPdbInfo.pdb, astr="reseq", directory=self.workdir)
pdb_edit.match_resseq(targetPdb=mrPdbInfo.pdb, sourcePdb=None, outPdb=mrPdbRes, resMap=resSeqMap)
mrPdb = mrPdbRes
else:
mrPdb = mrPdbInfo.pdb
# Make a copy of mrPdb with chains renamed to lower case
ucChains = mrPdbInfo.models[0].chains
toChains = [c.lower() for c in ucChains]
placedAaPdb = ample_util.filename_append(filename=mrPdb, astr="ren", directory=self.workdir)
pdb_edit.rename_chains(inpdb=mrPdb, outpdb=placedAaPdb, fromChain=ucChains, toChain=toChains)
# The list of chains in the native that we will be checking contacts from
fromChains = nativePdbInfo.models[0].chains
# Loop over origins, move the placed pdb to the new origin and then run ncont
# Object to hold data on best origin
self.data = None
for origin in origins:
placedOriginPdb = placedAaPdb
if origin != [0.0, 0.0, 0.0]:
# Move pdb to new origin
# ostr="origin{0}".format(i)
ostr = "o{0}".format(origin).replace(" ", "")
placedOriginPdb = ample_util.filename_append(filename=placedAaPdb, astr=ostr, directory=self.workdir)
pdb_edit.translate(inpdb=placedAaPdb, outpdb=placedOriginPdb, ftranslate=origin)
# Concatenate into one file
joinedPdb = ample_util.filename_append(filename=placedOriginPdb, astr="joined", directory=self.workdir)
pdb_edit.merge(pdb1=nativePdbInfo.pdb, pdb2=placedOriginPdb, pdbout=joinedPdb)
# Set up object to hold data
data = RioData()
data.origin = origin
data.originPdb = placedOriginPdb
data.joinedPdb = joinedPdb
data.fromChains = fromChains
data.toChains = toChains
data.numGood = 0 # For holding the metric
# Run ncont
if allAtom:
self.calcAllAtom(data)
data.numGood = data.aaNumContacts
else:
self.calcRio(data)
data.numGood = data.rioInRegister + data.rioOoRegister
# Save the first origin and only update if we get a better score
if not self.data or data.numGood > self.data.numGood:
self.data = data
# End loop over origins
# Now need to calculate data for whichever one we didn't calculate
if allAtom:
self.calcRio(self.data)
else:
self.calcAllAtom(self.data)
if self.data.numGood > 0:
# If we got a match run csymmatch so we can see the result
csym = csymmatch.Csymmatch()
csymmatchPdb = ample_util.filename_append(
filename=self.data.originPdb, astr="csymmatch_best", directory=self.workdir
)
csym.run(refPdb=nativePdbInfo.pdb, inPdb=self.data.originPdb, outPdb=csymmatchPdb, originHand=False)
return self.data
def _join_chunks(self, chunk1, chunk2, dsspP=None, maxGap=None):
"""Take two chunks of contacts and see if the gap between them can be joined.
If the chunks can't be joined, it returns the first chunk for adding to the list
of chunks, and the second chunk for use in the subsequent join step.
If the chunks can be joined, it returns None for the first chunk so that we know not
to add anything.
"""
assert dsspP and maxGap
# See if a suitable gap
width = abs(chunk2['startResSeq'] - chunk1['stopResSeq']) - 1
if width < 1 or width > maxGap or chunk1['chainId1'] != chunk2['chainId1']:
return (chunk1, chunk2)
# Suitable gap so make sure it's all helix
for resSeq in range(chunk1['stopResSeq'] + 1, chunk2['startResSeq'] - 1):
ss = dsspP.getAssignment(resSeq, chunk1['chainId1'], resName=None)
if ss != 'H':
return (chunk1, chunk2)
joinedChunk = {
'chainId1': chunk1['chainId1'],
'startIdx': chunk1['startIdx'],
'startResSeq': chunk1['startResSeq'],
'stopIdx': chunk2['stopIdx'],
'stopResSeq': chunk2['stopResSeq'],
}
return (None, joinedChunk)
[docs] def runNcont(self, pdbin=None, sourceChains=None, targetChains=None, maxDist=1.5, allAtom=False):
"""FOO
"""
if allAtom:
self.ncontLog = pdbin + ".ncont_aa.log"
else:
self.ncontLog = pdbin + ".ncont_rio.log"
cmd = ["ncont", "xyzin", pdbin]
# Build up stdin
stdin = ""
# Need to use list of chains from Native as can't work out negate operator for ncont
if allAtom:
stdin += "source {0}//*\n".format(",".join(sourceChains))
stdin += "target {0}//*\n".format(",".join(targetChains))
else:
stdin += "source {0}//CA\n".format(",".join(sourceChains))
stdin += "target {0}//CA\n".format(",".join(targetChains))
stdin += "maxdist {0}\n".format(maxDist)
stdin += "cells 2\n"
stdin += "sort target inc\n"
retcode = ample_util.run_command(cmd=cmd, logfile=self.ncontLog, directory=os.getcwd(), dolog=True, stdin=stdin)
if retcode != 0:
raise RuntimeError("Error running ncont command: {0}\nCheck log: {1}".format(cmd, self.ncontLog))
[docs] def parseNcontLog(self, contactData, logfile=None, clean_up=True):
"""
Lines are of format
/1/B/1042(MET). / CA [ C]: /1/b/ 988(GLU). / CA [ C]: 1.09 223 X-1/2,Y-1/2,Z
"""
if not logfile:
logfile = self.ncontLog
contactData.contacts = None
contactData.numContacts = 0
clines = []
capture = False
with open(logfile, 'r') as f: #
while True:
line = f.readline().rstrip()
if capture and not line:
break
if "contacts found:" in line:
contactData.numContacts = int(line.split()[0])
if "NO CONTACTS FOUND." in line:
return False
if "SOURCE ATOMS" in line:
capture = True
f.readline() # skip blank line
continue
if capture:
clines.append(line)
assert contactData.numContacts == len(clines)
contacts = []
# Got data lines so now extract data
# Could probably just do this in the reading loop now
lastSource = None
for c in clines:
# Reconstruct lines with only the second contact using the data from the corresponding last complete line
if not c[0:29].strip():
c = lastSource[0:29] + c[29:]
else:
lastSource = c
# As the numbers overrun we can't split so we assume fixed format
d = {}
d['chainId1'] = c[4]
d['resSeq1'] = int(c[6:10].strip())
aa1 = c[11:14]
# Will get key error for all-atom if there is solvent etc.
try:
d['aa1'] = ample_util.three2one[aa1] # get amino acid and convert to single letter code
except KeyError:
d['aa1'] = 'X'
d['chainId2'] = c[32]
d['resSeq2'] = int(c[34:38].strip())
aa2 = c[39:42]
try:
d['aa2'] = ample_util.three2one[aa2]
except KeyError:
d['aa2'] = 'X'
d['dist'] = float(c[56:62].strip())
d['cell'] = int(c[63:66])
d['symmetry'] = c[67:]
contacts.append(d)
contactData.contacts = contacts
if clean_up:
os.unlink(logfile)
[docs] def helixFromPdbs(self, origin, mrPdb, nativePdb, nativeChain, dsspLog, workdir=os.getcwd()):
"""This is a wrapper to generate the info and resSeqMap objects needed by score Origin"""
mrPdbInfo = pdb_edit.get_info(mrPdb)
nativePdbInfo = pdb_edit.get_info(nativePdb)
assert nativeChain in nativePdbInfo.models[0].chains
import residue_map
resSeqMap = residue_map.residueSequenceMap()
resSeqMap.fromInfo(
refInfo=nativePdbInfo,
refChainID=nativeChain,
targetInfo=mrPdbInfo,
targetChainID=mrPdbInfo.models[0].chains[0], # Only 1 chain in model
)
data = self.scoreOrigin(origin, mrPdbInfo, nativePdbInfo, resSeqMap, workdir)
contacts = data.contacts
return self.helixFromContacts(contacts, dsspLog)
[docs] def scoreOrigin(self, origin=None, mrPdbInfo=None, nativePdbInfo=None, resSeqMap=None, workdir=os.getcwd()):
self.workdir = workdir
if not resSeqMap.resSeqMatch():
# We need to create a copy of the placed pdb with numbering matching the native
mrPdbRes = ample_util.filename_append(filename=mrPdbInfo.pdb, astr="reseq", directory=self.workdir)
pdb_edit.match_resseq(targetPdb=mrPdbInfo.pdb, sourcePdb=None, outPdb=mrPdbRes, resMap=resSeqMap)
mrPdb = mrPdbRes
else:
mrPdb = mrPdbInfo.pdb
# Make a copy of mrPdb with chains renamed to lower case
ucChains = mrPdbInfo.models[0].chains
toChains = [c.lower() for c in ucChains]
mrAaPdb = ample_util.filename_append(filename=mrPdb, astr="ren", directory=self.workdir)
pdb_edit.rename_chains(inpdb=mrPdb, outpdb=mrAaPdb, fromChain=ucChains, toChain=toChains)
# The list of chains in the native that we will be checking contacts from
fromChains = nativePdbInfo.models[0].chains
mrOriginPdb = mrAaPdb
if origin != [0.0, 0.0, 0.0]:
# Move pdb to new origin
# ostr="origin{0}".format(i)
ostr = "o{0}".format(origin).replace(" ", "")
mrOriginPdb = ample_util.filename_append(filename=mrAaPdb, astr=ostr, directory=self.workdir)
pdb_edit.translate(inpdb=mrAaPdb, outpdb=mrOriginPdb, ftranslate=origin)
# Concatenate into one file
joinedPdb = ample_util.filename_append(filename=mrOriginPdb, astr="joined", directory=self.workdir)
pdb_edit.merge(pdb1=nativePdbInfo.pdb, pdb2=mrOriginPdb, pdbout=joinedPdb)
# Run ncont
data = RioData()
data.origin = origin
data.originPdb = mrOriginPdb
data.joinedPdb = joinedPdb
data.fromChains = fromChains
data.toChains = toChains
# First get AllAtom score
self.calcAllAtom(data)
# Then score RIO
self.calcRio(data)
# data.numGood = data.inregister + data.ooregister
# clean up
os.unlink(mrOriginPdb)
os.unlink(joinedPdb)
if os.path.isfile(mrAaPdb):
os.unlink(mrAaPdb)
return data
[docs] def ssIsOK(self, contact, dsspP=None, ssTest=False):
if ssTest:
ss = dsspP.getAssignment(contact['resSeq1'], contact['chainId1'], resName=contact['aa1'])
if ss == "H":
return True
return False
else:
return True
[docs] def findChunks(self, contacts=None, dsspP=None, ssTest=False, minContig=3):
if contacts is None:
return False
#
# Loop through the contacts finding the start, stop indices in the list of contacts of contiguous chunks
#
chunks = []
lastChainId1 = None
lastChainId2 = None
lastResSeq1 = None
lastResSeq2 = None
count = None
startIdx = None
stopIdx = None
backwards = 0
for i, c in enumerate(contacts):
ssOK = self.ssIsOK(c, dsspP=dsspP, ssTest=ssTest)
if i != 0:
# For getting the longest segment we only care it's changing by 1 - we test what matched how later
if (
(c['resSeq1'] == lastResSeq1 + 1 or c['resSeq1'] == lastResSeq1 - 1)
and (c['resSeq2'] == lastResSeq2 + 1 or c['resSeq2'] == lastResSeq2 - 1)
and c['chainId1'] == lastChainId1
and c['chainId2'] == lastChainId2
and ssOK
):
# Need to know when we are going backwards, but also need to know if we are in a stretch going backwards
# So we can spot if there has been a change, so we count how many we have been going backwards for
if c['resSeq1'] == lastResSeq1 - 1:
backwards += 1
if not (c['resSeq1'] == lastResSeq1 + 1 and backwards >= 1):
count += 1
stopIdx = i
lastChainId1 = c['chainId1']
lastChainId2 = c['chainId2']
lastResSeq1 = c['resSeq1']
lastResSeq2 = c['resSeq2']
# If this is the last one we want to drop through
if i < len(contacts) - 1:
continue
# Anything that doesn't continue didn't match
if count >= minContig:
d = {
'chainId1': lastChainId1,
'startIdx': startIdx,
'startResSeq': contacts[startIdx]['resSeq1'],
'stopIdx': stopIdx,
'stopResSeq': contacts[stopIdx]['resSeq1'],
}
chunks.append(d)
# Either starting afresh or a random residue
lastChainId1 = c['chainId1']
lastChainId2 = c['chainId2']
lastResSeq1 = c['resSeq1']
lastResSeq2 = c['resSeq2']
backwards = 0
if ssOK:
count = 1
startIdx = i
stopIdx = i
else:
count = 0
startIdx = i + 1
stopIdx = i + 1
return chunks