Coverage for biobb_flexserv/pcasuite/pcz_similarity.py: 77%
228 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-28 11:28 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-28 11:28 +0000
1#!/usr/bin/env python3
3"""Module containing the PCZsimilarity class and the command line interface."""
4from typing import Optional
5import json
6import numpy as np
7from pathlib import Path, PurePath
8from math import exp
9from biobb_common.generic.biobb_object import BiobbObject
10from biobb_common.tools.file_utils import launchlogger
13class PCZsimilarity(BiobbObject):
14 """
15 | biobb_flexserv PCZsimilarity
16 | Compute PCA similarity between two given compressed PCZ files.
17 | Wrapper of the pczdump tool from the PCAsuite FlexServ module.
19 Args:
20 input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
21 input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
22 output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464).
23 properties (dict - Python dictionary object containing the tool parameters, not input/output files):
24 * **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product).
25 * **binary_path** (*str*) - ("pczdump") pczdump binary path to be used.
26 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
27 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
28 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
29 * **container_path** (*str*) - (None) Container path definition.
30 * **container_image** (*str*) - ('afandiadib/ambertools:serial') Container image definition.
31 * **container_volume_path** (*str*) - ('/tmp') Container volume path definition.
32 * **container_working_dir** (*str*) - (None) Container working directory definition.
33 * **container_user_id** (*str*) - (None) Container user_id definition.
34 * **container_shell_path** (*str*) - ('/bin/bash') Path to default shell inside the container.
36 Examples:
37 This is a use example of how to use the building block from Python::
39 from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity
41 pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz',
42 input_pcz_path2='/path/to/pcazip_input2.pcz',
43 output_json_path='/path/to/pcz_similarity.json',
44 properties=prop)
46 Info:
47 * wrapped_software:
48 * name: FlexServ PCAsuite
49 * version: >=1.0
50 * license: Apache-2.0
51 * ontology:
52 * name: EDAM
53 * schema: http://edamontology.org/EDAM.owl
55 """
57 def __init__(self, input_pcz_path1: str, input_pcz_path2: str,
58 output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None:
60 properties = properties or {}
62 # Call parent class constructor
63 super().__init__(properties)
64 self.locals_var_dict = locals().copy()
66 # Input/Output files
67 self.io_dict = {
68 'in': {'input_pcz_path1': input_pcz_path1,
69 'input_pcz_path2': input_pcz_path2},
70 'out': {'output_json_path': output_json_path}
71 }
73 # Properties specific for BB
74 self.properties = properties
75 self.amplifying_factor = properties.get('amplifying_factor')
76 self.binary_path = properties.get('binary_path', 'pczdump')
78 # Check the properties
79 self.check_properties(properties)
80 self.check_arguments()
82 # Check two eigenvectors to be compatible for dot product
83 # i.e. same number of vectors and values per vector
84 def are_compatible(self, eigenvectors_1, eigenvectors_2):
85 # Check the number of eigenvectors and the number of values in both eigenvectors to match
86 if len(eigenvectors_1) != len(eigenvectors_2):
87 print('WARNING: Number of eigenvectors does not match')
88 return False
89 if len(eigenvectors_1[0]) != len(eigenvectors_2[0]):
90 print('WARNING: Number of values in eigenvectors does not match')
91 return False
92 return True
94 # Weighted Cross Product (WCP).
95 # Get the weighted cross product between eigenvectors
96 # This is meant to compare PCA results for molecular dynamics structural conformations
97 # The number of eigenvectors to be compared may be specified. All (0) by default
98 # DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04)
99 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
100 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
101 def get_similarity_index(self,
102 eigenvalues_1, eigenvectors_1,
103 eigenvalues_2, eigenvectors_2, dx=None):
105 # Check the number of eigenvectors and the number of values in both eigenvectors to match
106 if not self.are_compatible(eigenvectors_1, eigenvectors_2):
107 raise SystemExit('Eigenvectors are not compatible')
109 # Find out the total number of eigenvectors
110 # Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total
111 eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2))
113 # Find out the number of atoms in the structure
114 # Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z)
115 if len(eigenvectors_1[0]) % 3 != 0:
116 raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3')
117 atom_number = int(len(eigenvectors_1[0]) / 3)
119 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
120 # The value of the ~10th eigenvalue is usually taken.
121 if dx is not None:
122 amplifying_factor = dx
123 else:
124 amplifying_factor = eigenvalues_1[eigenvectors_number-1]
126 # Get the denominator
127 # Find new denominator
128 # for ($i=0;$i<$nvec;$i++){
129 # $cte1+=exp(-1/$val_1[$i]*$ampf);
130 # $cte2+=exp(-1/$val_2[$i]*$ampf);
131 # $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf);
132 # $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf);
133 # }
134 cte1 = part1 = cte2 = part2 = 0
135 for eigenvalue in eigenvalues_1:
136 cte1 += exp(-1 / eigenvalue * amplifying_factor)
137 part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2
138 for eigenvalue in eigenvalues_2:
139 cte2 += exp(-1 / eigenvalue * amplifying_factor)
140 part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2
141 denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2
143 # Get all eigenvector values together
144 eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev]
145 eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev]
147 # IDK what it is doing now
148 total_summatory = 0
149 for i in range(eigenvectors_number):
150 for j in range(eigenvectors_number):
151 # Array has vectors in increasing order of vap, get last one first
152 a = (eigenvectors_number - 1 - i) * atom_number * 3
153 b = (eigenvectors_number - i) * atom_number * 3 - 1
154 c = (eigenvectors_number - 1 - j) * atom_number * 3
155 d = (eigenvectors_number - j) * atom_number * 3 - 1
156 temp1 = eigenvector_values_1[a:b]
157 temp2 = eigenvector_values_2[c:d]
158 if len(temp1) != len(temp2):
159 raise ValueError("Projection of vectors of different size!!")
160 # Project the two vectors
161 add = 0
162 for k, value_1 in enumerate(temp1):
163 value_2 = temp2[k]
164 add += value_1 * value_2
165 add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor)
166 add2 = add ** 2
167 total_summatory += add2
169 similarity_index = total_summatory * 2 / denominator
170 return similarity_index
172 # Weighted Cross Product (WCP).
173 # DF implementation of Alberto's formula
174 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
175 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
176 def eigenmsip(self, eigenvalues_1, eigenvectors_1,
177 eigenvalues_2, eigenvectors_2,
178 dx=None):
180 evals1 = np.array(eigenvalues_1)
181 evals2 = np.array(eigenvalues_2)
183 evecs1 = np.array(eigenvectors_1)
184 evecs2 = np.array(eigenvectors_2)
186 n_components = len(eigenvectors_1)
188 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
189 # The value of the ~10th eigenvalue is usually taken.
190 if dx is not None:
191 amplifying_factor = dx
192 else:
193 amplifying_factor = evals1[n_components-1]
195 e1 = np.exp(-(amplifying_factor)**2/evals1)
196 e2 = np.exp(-(amplifying_factor)**2/evals2)
198 e1_2 = np.exp(-2*(amplifying_factor)**2/evals1)
199 e2_2 = np.exp(-2*(amplifying_factor)**2/evals2)
200 sume1 = np.sum(e1)
201 sume2 = np.sum(e2)
203 denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2)
205 # numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2))
206 # numerator_df = 2 * np.sum(numerator_df)
208 val_tmp = 0
209 accum_a = 0
210 c = sume1*sume2
211 for pc in range(0, n_components):
212 for pc2 in range(0, n_components):
213 eve1 = evecs1[pc]
214 eve2 = evecs2[pc2]
215 eva1 = evals1[pc]
216 eva2 = evals2[pc2]
217 a = np.dot(eve1, eve2)
218 b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2)
219 val_tmp = val_tmp + ((a * b)/c)**2
220 accum_a = accum_a + a
222 numerator = 2 * val_tmp
224 return numerator/(denominator)
226 # Get the dot product matrix of two eigenvectors
227 def dot_product(self, eigenvectors_1, eigenvectors_2):
228 # Check the number of eigenvectors and the number of values in both eigenvectors to match
229 if not self.are_compatible(eigenvectors_1, eigenvectors_2):
230 raise SystemExit('Eigenvectors are not compatible')
231 # Get the dot product
232 dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2))
233 return dpm
235 # Get the dot product matrix of two eigenvectors (squared and normalized).
236 # Absolute Similarity Index (Hess 2000, 2002)
237 def dot_product_accum(self, eigenvectors_1, eigenvectors_2):
238 n_components = len(eigenvectors_1)
239 # Get the dot product
240 dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
242 sso = (dpm * dpm).sum() / n_components
243 # sso = (dpm * dpm).sum() / n_components
244 return sso
246 # Get the subspace overlap
247 # Same as before, but with a square root
248 def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2):
249 # Get the number of eigenvectors
250 n_components = len(eigenvectors_1)
251 # Get the dot product
252 dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
254 sso = np.sqrt((dpm * dpm).sum() / n_components)
255 # sso = (dpm * dpm).sum() / n_components
256 return sso
258 # Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap
259 def get_rmsip(self, eigenvectors_1, eigenvectors_2):
261 # Get the number of eigenvectors
262 n_components = len(eigenvectors_1)
264 accum = 0
265 for pc in range(0, n_components):
266 for pc2 in range(0, n_components):
267 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
268 val = dpm * dpm
269 accum = accum + val
271 sso = np.sqrt(accum / n_components)
272 # sso = (dpm * dpm).sum() / n_components
273 return sso
275 # RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values.
276 # See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets,
277 # Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445
278 def get_rwsip(self,
279 eigenvalues_1, eigenvectors_1,
280 eigenvalues_2, eigenvectors_2):
282 # Get the number of eigenvectors
283 n_components = len(eigenvectors_1)
285 accum = 0
286 norm = 0
287 for pc in range(0, n_components):
288 for pc2 in range(0, n_components):
289 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
290 val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2]
291 accum = accum + val
292 norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc]
294 sso = np.sqrt(accum / norm)
295 # sso = (dpm * dpm).sum() / n_components
296 return sso
298 @launchlogger
299 def launch(self):
300 """Launches the execution of the FlexServ pcz_similarity module."""
302 # Setup Biobb
303 if self.check_restart():
304 return 0
305 self.stage_files()
307 if self.container_path:
308 working_dir = self.container_volume_path if self.container_volume_path else "/data"
309 else:
310 working_dir = self.stage_io_dict.get("unique_dir", "")
312 unique_dir = Path(self.stage_io_dict.get("unique_dir", ""))
314 # Temporary output
315 temp_out_1 = "output1.dat"
316 temp_out_2 = "output2.dat"
317 temp_out_1_path = unique_dir.joinpath(temp_out_1)
318 temp_out_2_path = unique_dir.joinpath(temp_out_2)
319 staged_output_json_path = unique_dir.joinpath(Path(self.stage_io_dict["out"]["output_json_path"]).name)
321 # Command line 1
322 # pczdump -i structure.ca.std.pcz --evals -o evals.txt
323 # self.cmd = [self.binary_path, # Evals pcz 1
324 # "-i", input_pcz_1,
325 # "-o", temp_out_1,
326 # "--evals", ';',
327 # self.binary_path, # Evals pcz 2
328 # "-i", input_pcz_2,
329 # "-o", temp_out_2,
330 # "--evals"
331 # ]
333 self.cmd = ['cd', working_dir, ';',
334 self.binary_path, # Evals pcz 1
335 "-i", PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name,
336 "-o", temp_out_1,
337 "--evals", ';',
338 self.binary_path, # Evals pcz 2
339 "-i", PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name,
340 "-o", temp_out_2,
341 "--evals"
342 ]
344 # Run Biobb block 1
345 self.run_biobb()
347 # Parse output evals
348 info_dict = {}
349 info_dict['evals_1'] = []
350 info_dict['evals_2'] = []
352 with open(temp_out_1_path, 'r') as file:
353 for line in file:
354 info = float(line.strip())
355 info_dict['evals_1'].append(info)
357 with open(temp_out_2_path, 'r') as file:
358 for line in file:
359 info = float(line.strip())
360 info_dict['evals_2'].append(info)
362 num_evals_1 = len(info_dict['evals_1'])
363 num_evals_2 = len(info_dict['evals_2'])
364 num_evals_min = min(num_evals_1, num_evals_2)
365 info_dict['num_evals_min'] = num_evals_min
367 # Command line 2
368 # pczdump -i structure.ca.std.pcz --evals -o evals.txt
369 self.cmd = ['cd', working_dir, ';']
370 for pc in (range(1, num_evals_min+1)):
371 # Evecs pcz 1
372 self.cmd.append(self.binary_path)
373 self.cmd.append("-i")
374 self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name)
375 self.cmd.append("-o")
376 self.cmd.append("evecs_1_pc{}".format(pc))
377 self.cmd.append("--evec={}".format(pc))
378 self.cmd.append(";")
379 # Evals pcz 2
380 self.cmd.append(self.binary_path)
381 self.cmd.append("-i")
382 self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name)
383 self.cmd.append("-o")
384 self.cmd.append("evecs_2_pc{}".format(pc))
385 self.cmd.append("--evec={}".format(pc))
386 self.cmd.append(";")
388 # Run Biobb block 2
389 self.run_biobb()
391 # Parse output evecs
392 info_dict['evecs_1'] = {}
393 info_dict['evecs_2'] = {}
394 eigenvectors_1 = []
395 eigenvectors_2 = []
396 for pc in (range(1, num_evals_min+1)):
397 pc_id = "pc{}".format(pc)
398 info_dict['evecs_1'][pc_id] = []
399 info_dict['evecs_2'][pc_id] = []
400 with open(unique_dir.joinpath("evecs_1_pc{}".format(pc)), 'r') as file:
401 list_evecs = []
402 for line in file:
403 info = line.strip().split(' ')
404 for nums in info:
405 if nums:
406 list_evecs.append(float(nums))
407 info_dict['evecs_1'][pc_id] = list_evecs
408 eigenvectors_1.append(list_evecs)
410 with open(unique_dir.joinpath("evecs_2_pc{}".format(pc)), 'r') as file:
411 list_evecs = []
412 for line in file:
413 info = line.strip().split(' ')
414 for nums in info:
415 if nums:
416 list_evecs.append(float(nums))
417 info_dict['evecs_2'][pc_id] = list_evecs
418 eigenvectors_2.append(list_evecs)
420 # simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
421 # info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex))
422 # dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2)
423 # info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct))
424 eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
425 info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip))
426 rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2)
427 info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip))
428 rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2)
429 info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip))
430 dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2)
431 info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp))
433 with open(staged_output_json_path, 'w') as out_file:
434 out_file.write(json.dumps(info_dict, indent=4))
436 # Copy files to host
437 self.copy_to_host()
439 # Remove temporary folder(s)
440 self.remove_tmp_files()
442 self.check_arguments(output_files_created=True, raise_exception=False)
444 return self.return_code
447def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str,
448 properties: Optional[dict] = None, **kwargs) -> int:
449 """Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and
450 execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method"""
451 return PCZsimilarity(**dict(locals())).launch()
454pcz_similarity.__doc__ = PCZsimilarity.__doc__
455main = PCZsimilarity.get_main(pcz_similarity, "Compute PCA Similarity from a given pair of compressed PCZ files.")
457if __name__ == '__main__':
458 main()