Coverage for biobb_flexserv / pcasuite / pcz_similarity.py: 78%
231 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 13:10 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-05 13:10 +0000
1#!/usr/bin/env python3
3"""Module containing the PCZsimilarity class and the command line interface."""
4from typing import Optional
5import json
6import numpy as np
7import shutil
8from pathlib import PurePath
9from biobb_common.tools import file_utils as fu
10from math import exp
11from biobb_common.generic.biobb_object import BiobbObject
12from biobb_common.tools.file_utils import launchlogger
15class PCZsimilarity(BiobbObject):
16 """
17 | biobb_flexserv PCZsimilarity
18 | Compute PCA similarity between two given compressed PCZ files.
19 | Wrapper of the pczdump tool from the PCAsuite FlexServ module.
21 Args:
22 input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
23 input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
24 output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464).
25 properties (dict - Python dictionary object containing the tool parameters, not input/output files):
26 * **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product).
27 * **binary_path** (*str*) - ("pczdump") pczdump binary path to be used.
28 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
29 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
30 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
32 Examples:
33 This is a use example of how to use the building block from Python::
35 from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity
37 pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz',
38 input_pcz_path2='/path/to/pcazip_input2.pcz',
39 output_json_path='/path/to/pcz_similarity.json',
40 properties=prop)
42 Info:
43 * wrapped_software:
44 * name: FlexServ PCAsuite
45 * version: >=1.0
46 * license: Apache-2.0
47 * ontology:
48 * name: EDAM
49 * schema: http://edamontology.org/EDAM.owl
51 """
53 def __init__(self, input_pcz_path1: str, input_pcz_path2: str,
54 output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None:
56 properties = properties or {}
58 # Call parent class constructor
59 super().__init__(properties)
60 self.locals_var_dict = locals().copy()
62 # Input/Output files
63 self.io_dict = {
64 'in': {'input_pcz_path1': input_pcz_path1,
65 'input_pcz_path2': input_pcz_path2},
66 'out': {'output_json_path': output_json_path}
67 }
69 # Properties specific for BB
70 self.properties = properties
71 self.amplifying_factor = properties.get('amplifying_factor')
72 self.binary_path = properties.get('binary_path', 'pczdump')
74 # Check the properties
75 self.check_properties(properties)
76 self.check_arguments()
78 # Check two eigenvectors to be compatible for dot product
79 # i.e. same number of vectors and values per vector
80 def are_compatible(self, eigenvectors_1, eigenvectors_2):
81 # Check the number of eigenvectors and the number of values in both eigenvectors to match
82 if len(eigenvectors_1) != len(eigenvectors_2):
83 print('WARNING: Number of eigenvectors does not match')
84 return False
85 if len(eigenvectors_1[0]) != len(eigenvectors_2[0]):
86 print('WARNING: Number of values in eigenvectors does not match')
87 return False
88 return True
90 # Weighted Cross Product (WCP).
91 # Get the weighted cross product between eigenvectors
92 # This is meant to compare PCA results for molecular dynamics structural conformations
93 # The number of eigenvectors to be compared may be specified. All (0) by default
94 # DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04)
95 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
96 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
97 def get_similarity_index(self,
98 eigenvalues_1, eigenvectors_1,
99 eigenvalues_2, eigenvectors_2, dx=None):
101 # Check the number of eigenvectors and the number of values in both eigenvectors to match
102 if not self.are_compatible(eigenvectors_1, eigenvectors_2):
103 raise SystemExit('Eigenvectors are not compatible')
105 # Find out the total number of eigenvectors
106 # Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total
107 eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2))
109 # Find out the number of atoms in the structure
110 # Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z)
111 if len(eigenvectors_1[0]) % 3 != 0:
112 raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3')
113 atom_number = int(len(eigenvectors_1[0]) / 3)
115 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
116 # The value of the ~10th eigenvalue is usually taken.
117 if dx is not None:
118 amplifying_factor = dx
119 else:
120 amplifying_factor = eigenvalues_1[eigenvectors_number-1]
122 # Get the denominator
123 # Find new denominator
124 # for ($i=0;$i<$nvec;$i++){
125 # $cte1+=exp(-1/$val_1[$i]*$ampf);
126 # $cte2+=exp(-1/$val_2[$i]*$ampf);
127 # $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf);
128 # $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf);
129 # }
130 cte1 = part1 = cte2 = part2 = 0
131 for eigenvalue in eigenvalues_1:
132 cte1 += exp(-1 / eigenvalue * amplifying_factor)
133 part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2
134 for eigenvalue in eigenvalues_2:
135 cte2 += exp(-1 / eigenvalue * amplifying_factor)
136 part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2
137 denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2
139 # Get all eigenvector values together
140 eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev]
141 eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev]
143 # IDK what it is doing now
144 total_summatory = 0
145 for i in range(eigenvectors_number):
146 for j in range(eigenvectors_number):
147 # Array has vectors in increasing order of vap, get last one first
148 a = (eigenvectors_number - 1 - i) * atom_number * 3
149 b = (eigenvectors_number - i) * atom_number * 3 - 1
150 c = (eigenvectors_number - 1 - j) * atom_number * 3
151 d = (eigenvectors_number - j) * atom_number * 3 - 1
152 temp1 = eigenvector_values_1[a:b]
153 temp2 = eigenvector_values_2[c:d]
154 if len(temp1) != len(temp2):
155 raise ValueError("Projection of vectors of different size!!")
156 # Project the two vectors
157 add = 0
158 for k, value_1 in enumerate(temp1):
159 value_2 = temp2[k]
160 add += value_1 * value_2
161 add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor)
162 add2 = add ** 2
163 total_summatory += add2
165 similarity_index = total_summatory * 2 / denominator
166 return similarity_index
168 # Weighted Cross Product (WCP).
169 # DF implementation of Alberto's formula
170 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
171 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
172 def eigenmsip(self, eigenvalues_1, eigenvectors_1,
173 eigenvalues_2, eigenvectors_2,
174 dx=None):
176 evals1 = np.array(eigenvalues_1)
177 evals2 = np.array(eigenvalues_2)
179 evecs1 = np.array(eigenvectors_1)
180 evecs2 = np.array(eigenvectors_2)
182 n_components = len(eigenvectors_1)
184 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
185 # The value of the ~10th eigenvalue is usually taken.
186 if dx is not None:
187 amplifying_factor = dx
188 else:
189 amplifying_factor = evals1[n_components-1]
191 e1 = np.exp(-(amplifying_factor)**2/evals1)
192 e2 = np.exp(-(amplifying_factor)**2/evals2)
194 e1_2 = np.exp(-2*(amplifying_factor)**2/evals1)
195 e2_2 = np.exp(-2*(amplifying_factor)**2/evals2)
196 sume1 = np.sum(e1)
197 sume2 = np.sum(e2)
199 denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2)
201 # numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2))
202 # numerator_df = 2 * np.sum(numerator_df)
204 val_tmp = 0
205 accum_a = 0
206 c = sume1*sume2
207 for pc in range(0, n_components):
208 for pc2 in range(0, n_components):
209 eve1 = evecs1[pc]
210 eve2 = evecs2[pc2]
211 eva1 = evals1[pc]
212 eva2 = evals2[pc2]
213 a = np.dot(eve1, eve2)
214 b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2)
215 val_tmp = val_tmp + ((a * b)/c)**2
216 accum_a = accum_a + a
218 numerator = 2 * val_tmp
220 return numerator/(denominator)
222 # Get the dot product matrix of two eigenvectors
223 def dot_product(self, eigenvectors_1, eigenvectors_2):
224 # Check the number of eigenvectors and the number of values in both eigenvectors to match
225 if not self.are_compatible(eigenvectors_1, eigenvectors_2):
226 raise SystemExit('Eigenvectors are not compatible')
227 # Get the dot product
228 dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2))
229 return dpm
231 # Get the dot product matrix of two eigenvectors (squared and normalized).
232 # Absolute Similarity Index (Hess 2000, 2002)
233 def dot_product_accum(self, eigenvectors_1, eigenvectors_2):
234 n_components = len(eigenvectors_1)
235 # Get the dot product
236 dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
238 sso = (dpm * dpm).sum() / n_components
239 # sso = (dpm * dpm).sum() / n_components
240 return sso
242 # Get the subspace overlap
243 # Same as before, but with a square root
244 def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2):
245 # Get the number of eigenvectors
246 n_components = len(eigenvectors_1)
247 # Get the dot product
248 dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
250 sso = np.sqrt((dpm * dpm).sum() / n_components)
251 # sso = (dpm * dpm).sum() / n_components
252 return sso
254 # Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap
255 def get_rmsip(self, eigenvectors_1, eigenvectors_2):
257 # Get the number of eigenvectors
258 n_components = len(eigenvectors_1)
260 accum = 0
261 for pc in range(0, n_components):
262 for pc2 in range(0, n_components):
263 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
264 val = dpm * dpm
265 accum = accum + val
267 sso = np.sqrt(accum / n_components)
268 # sso = (dpm * dpm).sum() / n_components
269 return sso
271 # RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values.
272 # See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets,
273 # Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445
274 def get_rwsip(self,
275 eigenvalues_1, eigenvectors_1,
276 eigenvalues_2, eigenvectors_2):
278 # Get the number of eigenvectors
279 n_components = len(eigenvectors_1)
281 accum = 0
282 norm = 0
283 for pc in range(0, n_components):
284 for pc2 in range(0, n_components):
285 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
286 val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2]
287 accum = accum + val
288 norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc]
290 sso = np.sqrt(accum / norm)
291 # sso = (dpm * dpm).sum() / n_components
292 return sso
294 @launchlogger
295 def launch(self):
296 """Launches the execution of the FlexServ pcz_similarity module."""
298 # Setup Biobb
299 if self.check_restart():
300 return 0
301 # self.stage_files()
303 # Internal file paths
304 # try:
305 # # Using rel paths to shorten the amount of characters due to fortran path length limitations
306 # input_pcz_1 = str(Path(self.stage_io_dict["in"]["input_pcz_path1"]).relative_to(Path.cwd()))
307 # input_pcz_2 = str(Path(self.stage_io_dict["in"]["input_pcz_path2"]).relative_to(Path.cwd()))
308 # output_json = str(Path(self.stage_io_dict["out"]["output_json_path"]).relative_to(Path.cwd()))
309 # except ValueError:
310 # # Container or remote case
311 # output_json = self.stage_io_dict["out"]["output_json_path"]
313 # Manually creating a Sandbox to avoid issues with input parameters buffer overflow:
314 # Long strings defining a file path makes Fortran or C compiled programs crash if the string
315 # declared is shorter than the input parameter path (string) length.
316 # Generating a temporary folder and working inside this folder (sandbox) fixes this problem.
317 # The problem was found in Galaxy executions, launching Singularity containers (May 2023).
319 # Creating temporary folder
320 tmp_folder = fu.create_unique_dir()
321 fu.log('Creating %s temporary folder' % tmp_folder, self.out_log)
323 shutil.copy2(self.io_dict["in"]["input_pcz_path1"], tmp_folder)
324 shutil.copy2(self.io_dict["in"]["input_pcz_path2"], tmp_folder)
326 # Temporary output
327 temp_json = "output.json"
328 temp_out_1 = "output1.dat"
329 temp_out_2 = "output2.dat"
331 # Command line 1
332 # pczdump -i structure.ca.std.pcz --evals -o evals.txt
333 # self.cmd = [self.binary_path, # Evals pcz 1
334 # "-i", input_pcz_1,
335 # "-o", temp_out_1,
336 # "--evals", ';',
337 # self.binary_path, # Evals pcz 2
338 # "-i", input_pcz_2,
339 # "-o", temp_out_2,
340 # "--evals"
341 # ]
343 self.cmd = ['cd', tmp_folder, ';',
344 self.binary_path, # Evals pcz 1
345 "-i", PurePath(self.io_dict["in"]["input_pcz_path1"]).name,
346 "-o", temp_out_1,
347 "--evals", ';',
348 self.binary_path, # Evals pcz 2
349 "-i", PurePath(self.io_dict["in"]["input_pcz_path2"]).name,
350 "-o", temp_out_2,
351 "--evals"
352 ]
354 # Run Biobb block 1
355 self.run_biobb()
357 # Parse output evals
358 info_dict = {}
359 info_dict['evals_1'] = []
360 info_dict['evals_2'] = []
362 with open(PurePath(tmp_folder).joinpath(temp_out_1), 'r') as file:
363 for line in file:
364 info = float(line.strip())
365 info_dict['evals_1'].append(info)
367 with open(PurePath(tmp_folder).joinpath(temp_out_2), 'r') as file:
368 for line in file:
369 info = float(line.strip())
370 info_dict['evals_2'].append(info)
372 num_evals_1 = len(info_dict['evals_1'])
373 num_evals_2 = len(info_dict['evals_2'])
374 num_evals_min = min(num_evals_1, num_evals_2)
375 info_dict['num_evals_min'] = num_evals_min
377 # Command line 2
378 # pczdump -i structure.ca.std.pcz --evals -o evals.txt
379 self.cmd = []
380 self.cmd.append('cd')
381 self.cmd.append(tmp_folder)
382 self.cmd.append(';')
383 for pc in (range(1, num_evals_min+1)):
384 # Evecs pcz 1
385 self.cmd.append(self.binary_path)
386 self.cmd.append("-i")
387 # self.cmd.append(input_pcz_1)
388 self.cmd.append(PurePath(self.io_dict["in"]["input_pcz_path1"]).name)
389 # self.cmd.append("-o evecs_1_pc{}".format(pc))
390 self.cmd.append("-o")
391 # self.cmd.append(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_1_pc{}".format(pc))))
392 self.cmd.append("evecs_1_pc{}".format(pc))
393 self.cmd.append("--evec={}".format(pc))
394 self.cmd.append(";")
395 # Evals pcz 2
396 self.cmd.append(self.binary_path)
397 self.cmd.append("-i")
398 # self.cmd.append(input_pcz_2)
399 self.cmd.append(PurePath(self.io_dict["in"]["input_pcz_path2"]).name)
400 # self.cmd.append("-o evecs_2_pc{}".format(pc))
401 self.cmd.append("-o")
402 # self.cmd.append(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_2_pc{}".format(pc))))
403 self.cmd.append("evecs_2_pc{}".format(pc))
404 self.cmd.append("--evec={}".format(pc))
405 self.cmd.append(";")
407 # Run Biobb block 2
408 self.run_biobb()
410 # Parse output evecs
411 info_dict['evecs_1'] = {}
412 info_dict['evecs_2'] = {}
413 eigenvectors_1 = []
414 eigenvectors_2 = []
415 for pc in (range(1, num_evals_min+1)):
416 pc_id = "pc{}".format(pc)
417 info_dict['evecs_1'][pc_id] = []
418 info_dict['evecs_2'][pc_id] = []
419 # with open(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_1_pc{}".format(pc))), 'r') as file:
420 with open(PurePath(tmp_folder).joinpath("evecs_1_pc{}".format(pc)), 'r') as file:
421 list_evecs = []
422 for line in file:
423 info = line.strip().split(' ')
424 for nums in info:
425 if nums:
426 list_evecs.append(float(nums))
427 info_dict['evecs_1'][pc_id] = list_evecs
428 eigenvectors_1.append(list_evecs)
430 # with open(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_2_pc{}".format(pc))), 'r') as file:
431 with open(PurePath(tmp_folder).joinpath("evecs_2_pc{}".format(pc)), 'r') as file:
432 list_evecs = []
433 for line in file:
434 info = line.strip().split(' ')
435 for nums in info:
436 if nums:
437 list_evecs.append(float(nums))
438 info_dict['evecs_2'][pc_id] = list_evecs
439 eigenvectors_2.append(list_evecs)
441 # simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
442 # info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex))
443 # dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2)
444 # info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct))
445 eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
446 info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip))
447 rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2)
448 info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip))
449 rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2)
450 info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip))
451 dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2)
452 info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp))
454 with open(PurePath(tmp_folder).joinpath(temp_json), 'w') as out_file:
455 out_file.write(json.dumps(info_dict, indent=4))
457 # Copy outputs from temporary folder to output path
458 shutil.copy2(PurePath(tmp_folder).joinpath(temp_json), PurePath(self.io_dict["out"]["output_json_path"]))
460 # Remove temporary folder(s)
461 self.tmp_files.append(tmp_folder)
462 self.remove_tmp_files()
464 self.check_arguments(output_files_created=True, raise_exception=False)
466 return self.return_code
469def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str,
470 properties: Optional[dict] = None, **kwargs) -> int:
471 """Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and
472 execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method"""
473 return PCZsimilarity(**dict(locals())).launch()
476pcz_similarity.__doc__ = PCZsimilarity.__doc__
477main = PCZsimilarity.get_main(pcz_similarity, "Compute PCA Similarity from a given pair of compressed PCZ files.")
479if __name__ == '__main__':
480 main()