Coverage for biobb_flexserv/pcasuite/pcz_similarity.py: 74%
243 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 15:08 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-19 15:08 +0000
1#!/usr/bin/env python3
3"""Module containing the PCZsimilarity class and the command line interface."""
4import argparse
5from typing import Optional
6import json
7import numpy as np
8import shutil
9from pathlib import PurePath
10from biobb_common.tools import file_utils as fu
11from math import exp
12from biobb_common.generic.biobb_object import BiobbObject
13from biobb_common.configuration import settings
14from biobb_common.tools.file_utils import launchlogger
17class PCZsimilarity(BiobbObject):
18 """
19 | biobb_flexserv PCZsimilarity
20 | Compute PCA similarity between two given compressed PCZ files.
21 | Wrapper of the pczdump tool from the PCAsuite FlexServ module.
23 Args:
24 input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
25 input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874).
26 output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464).
27 properties (dict - Python dictionary object containing the tool parameters, not input/output files):
28 * **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product).
29 * **binary_path** (*str*) - ("pczdump") pczdump binary path to be used.
30 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
31 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
32 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
34 Examples:
35 This is a use example of how to use the building block from Python::
37 from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity
39 pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz',
40 input_pcz_path2='/path/to/pcazip_input2.pcz',
41 output_json_path='/path/to/pcz_similarity.json',
42 properties=prop)
44 Info:
45 * wrapped_software:
46 * name: FlexServ PCAsuite
47 * version: >=1.0
48 * license: Apache-2.0
49 * ontology:
50 * name: EDAM
51 * schema: http://edamontology.org/EDAM.owl
53 """
55 def __init__(self, input_pcz_path1: str, input_pcz_path2: str,
56 output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None:
58 properties = properties or {}
60 # Call parent class constructor
61 super().__init__(properties)
62 self.locals_var_dict = locals().copy()
64 # Input/Output files
65 self.io_dict = {
66 'in': {'input_pcz_path1': input_pcz_path1,
67 'input_pcz_path2': input_pcz_path2},
68 'out': {'output_json_path': output_json_path}
69 }
71 # Properties specific for BB
72 self.properties = properties
73 self.amplifying_factor = properties.get('amplifying_factor')
74 self.binary_path = properties.get('binary_path', 'pczdump')
76 # Check the properties
77 self.check_properties(properties)
78 self.check_arguments()
80 # Check two eigenvectors to be compatible for dot product
81 # i.e. same number of vectors and values per vector
82 def are_compatible(self, eigenvectors_1, eigenvectors_2):
83 # Check the number of eigenvectors and the number of values in both eigenvectors to match
84 if len(eigenvectors_1) != len(eigenvectors_2):
85 print('WARNING: Number of eigenvectors does not match')
86 return False
87 if len(eigenvectors_1[0]) != len(eigenvectors_2[0]):
88 print('WARNING: Number of values in eigenvectors does not match')
89 return False
90 return True
92 # Weighted Cross Product (WCP).
93 # Get the weighted cross product between eigenvectors
94 # This is meant to compare PCA results for molecular dynamics structural conformations
95 # The number of eigenvectors to be compared may be specified. All (0) by default
96 # DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04)
97 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
98 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
99 def get_similarity_index(self,
100 eigenvalues_1, eigenvectors_1,
101 eigenvalues_2, eigenvectors_2, dx=None):
103 # Check the number of eigenvectors and the number of values in both eigenvectors to match
104 if not self.are_compatible(eigenvectors_1, eigenvectors_2):
105 raise SystemExit('Eigenvectors are not compatible')
107 # Find out the total number of eigenvectors
108 # Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total
109 eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2))
111 # Find out the number of atoms in the structure
112 # Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z)
113 if len(eigenvectors_1[0]) % 3 != 0:
114 raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3')
115 atom_number = int(len(eigenvectors_1[0]) / 3)
117 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
118 # The value of the ~10th eigenvalue is usually taken.
119 if dx is not None:
120 amplifying_factor = dx
121 else:
122 amplifying_factor = eigenvalues_1[eigenvectors_number-1]
124 # Get the denominator
125 # Find new denominator
126 # for ($i=0;$i<$nvec;$i++){
127 # $cte1+=exp(-1/$val_1[$i]*$ampf);
128 # $cte2+=exp(-1/$val_2[$i]*$ampf);
129 # $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf);
130 # $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf);
131 # }
132 cte1 = part1 = cte2 = part2 = 0
133 for eigenvalue in eigenvalues_1:
134 cte1 += exp(-1 / eigenvalue * amplifying_factor)
135 part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2
136 for eigenvalue in eigenvalues_2:
137 cte2 += exp(-1 / eigenvalue * amplifying_factor)
138 part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2
139 denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2
141 # Get all eigenvector values together
142 eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev]
143 eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev]
145 # IDK what it is doing now
146 total_summatory = 0
147 for i in range(eigenvectors_number):
148 for j in range(eigenvectors_number):
149 # Array has vectors in increasing order of vap, get last one first
150 a = (eigenvectors_number - 1 - i) * atom_number * 3
151 b = (eigenvectors_number - i) * atom_number * 3 - 1
152 c = (eigenvectors_number - 1 - j) * atom_number * 3
153 d = (eigenvectors_number - j) * atom_number * 3 - 1
154 temp1 = eigenvector_values_1[a:b]
155 temp2 = eigenvector_values_2[c:d]
156 if len(temp1) != len(temp2):
157 raise ValueError("Projection of vectors of different size!!")
158 # Project the two vectors
159 add = 0
160 for k, value_1 in enumerate(temp1):
161 value_2 = temp2[k]
162 add += value_1 * value_2
163 add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor)
164 add2 = add ** 2
165 total_summatory += add2
167 similarity_index = total_summatory * 2 / denominator
168 return similarity_index
170 # Weighted Cross Product (WCP).
171 # DF implementation of Alberto's formula
172 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda,
173 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1.
174 def eigenmsip(self, eigenvalues_1, eigenvectors_1,
175 eigenvalues_2, eigenvectors_2,
176 dx=None):
178 evals1 = np.array(eigenvalues_1)
179 evals2 = np.array(eigenvalues_2)
181 evecs1 = np.array(eigenvectors_1)
182 evecs2 = np.array(eigenvectors_2)
184 n_components = len(eigenvectors_1)
186 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product.
187 # The value of the ~10th eigenvalue is usually taken.
188 if dx is not None:
189 amplifying_factor = dx
190 else:
191 amplifying_factor = evals1[n_components-1]
193 e1 = np.exp(-(amplifying_factor)**2/evals1)
194 e2 = np.exp(-(amplifying_factor)**2/evals2)
196 e1_2 = np.exp(-2*(amplifying_factor)**2/evals1)
197 e2_2 = np.exp(-2*(amplifying_factor)**2/evals2)
198 sume1 = np.sum(e1)
199 sume2 = np.sum(e2)
201 denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2)
203 # numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2))
204 # numerator_df = 2 * np.sum(numerator_df)
206 val_tmp = 0
207 accum_a = 0
208 c = sume1*sume2
209 for pc in range(0, n_components):
210 for pc2 in range(0, n_components):
211 eve1 = evecs1[pc]
212 eve2 = evecs2[pc2]
213 eva1 = evals1[pc]
214 eva2 = evals2[pc2]
215 a = np.dot(eve1, eve2)
216 b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2)
217 val_tmp = val_tmp + ((a * b)/c)**2
218 accum_a = accum_a + a
220 numerator = 2 * val_tmp
222 return numerator/(denominator)
224 # Get the dot product matrix of two eigenvectors
225 def dot_product(self, eigenvectors_1, eigenvectors_2):
226 # Check the number of eigenvectors and the number of values in both eigenvectors to match
227 if not self.are_compatible(eigenvectors_1, eigenvectors_2):
228 raise SystemExit('Eigenvectors are not compatible')
229 # Get the dot product
230 dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2))
231 return dpm
233 # Get the dot product matrix of two eigenvectors (squared and normalized).
234 # Absolute Similarity Index (Hess 2000, 2002)
235 def dot_product_accum(self, eigenvectors_1, eigenvectors_2):
236 n_components = len(eigenvectors_1)
237 # Get the dot product
238 dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
240 sso = (dpm * dpm).sum() / n_components
241 # sso = (dpm * dpm).sum() / n_components
242 return sso
244 # Get the subspace overlap
245 # Same as before, but with a square root
246 def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2):
247 # Get the number of eigenvectors
248 n_components = len(eigenvectors_1)
249 # Get the dot product
250 dpm = self.dot_product(eigenvectors_1, eigenvectors_2)
252 sso = np.sqrt((dpm * dpm).sum() / n_components)
253 # sso = (dpm * dpm).sum() / n_components
254 return sso
256 # Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap
257 def get_rmsip(self, eigenvectors_1, eigenvectors_2):
259 # Get the number of eigenvectors
260 n_components = len(eigenvectors_1)
262 accum = 0
263 for pc in range(0, n_components):
264 for pc2 in range(0, n_components):
265 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
266 val = dpm * dpm
267 accum = accum + val
269 sso = np.sqrt(accum / n_components)
270 # sso = (dpm * dpm).sum() / n_components
271 return sso
273 # RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values.
274 # See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets,
275 # Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445
276 def get_rwsip(self,
277 eigenvalues_1, eigenvectors_1,
278 eigenvalues_2, eigenvectors_2):
280 # Get the number of eigenvectors
281 n_components = len(eigenvectors_1)
283 accum = 0
284 norm = 0
285 for pc in range(0, n_components):
286 for pc2 in range(0, n_components):
287 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2])
288 val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2]
289 accum = accum + val
290 norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc]
292 sso = np.sqrt(accum / norm)
293 # sso = (dpm * dpm).sum() / n_components
294 return sso
296 @launchlogger
297 def launch(self):
298 """Launches the execution of the FlexServ pcz_similarity module."""
300 # Setup Biobb
301 if self.check_restart():
302 return 0
303 # self.stage_files()
305 # Internal file paths
306 # try:
307 # # Using rel paths to shorten the amount of characters due to fortran path length limitations
308 # input_pcz_1 = str(Path(self.stage_io_dict["in"]["input_pcz_path1"]).relative_to(Path.cwd()))
309 # input_pcz_2 = str(Path(self.stage_io_dict["in"]["input_pcz_path2"]).relative_to(Path.cwd()))
310 # output_json = str(Path(self.stage_io_dict["out"]["output_json_path"]).relative_to(Path.cwd()))
311 # except ValueError:
312 # # Container or remote case
313 # output_json = self.stage_io_dict["out"]["output_json_path"]
315 # Manually creating a Sandbox to avoid issues with input parameters buffer overflow:
316 # Long strings defining a file path makes Fortran or C compiled programs crash if the string
317 # declared is shorter than the input parameter path (string) length.
318 # Generating a temporary folder and working inside this folder (sandbox) fixes this problem.
319 # The problem was found in Galaxy executions, launching Singularity containers (May 2023).
321 # Creating temporary folder
322 self.tmp_folder = fu.create_unique_dir()
323 fu.log('Creating %s temporary folder' % self.tmp_folder, self.out_log)
325 shutil.copy2(self.io_dict["in"]["input_pcz_path1"], self.tmp_folder)
326 shutil.copy2(self.io_dict["in"]["input_pcz_path2"], self.tmp_folder)
328 # Temporary output
329 temp_json = "output.json"
330 temp_out_1 = "output1.dat"
331 temp_out_2 = "output2.dat"
333 # Command line 1
334 # pczdump -i structure.ca.std.pcz --evals -o evals.txt
335 # self.cmd = [self.binary_path, # Evals pcz 1
336 # "-i", input_pcz_1,
337 # "-o", temp_out_1,
338 # "--evals", ';',
339 # self.binary_path, # Evals pcz 2
340 # "-i", input_pcz_2,
341 # "-o", temp_out_2,
342 # "--evals"
343 # ]
345 self.cmd = ['cd', self.tmp_folder, ';',
346 self.binary_path, # Evals pcz 1
347 "-i", PurePath(self.io_dict["in"]["input_pcz_path1"]).name,
348 "-o", temp_out_1,
349 "--evals", ';',
350 self.binary_path, # Evals pcz 2
351 "-i", PurePath(self.io_dict["in"]["input_pcz_path2"]).name,
352 "-o", temp_out_2,
353 "--evals"
354 ]
356 # Run Biobb block 1
357 self.run_biobb()
359 # Parse output evals
360 info_dict = {}
361 info_dict['evals_1'] = []
362 info_dict['evals_2'] = []
364 with open(PurePath(self.tmp_folder).joinpath(temp_out_1), 'r') as file:
365 for line in file:
366 info = float(line.strip())
367 info_dict['evals_1'].append(info)
369 with open(PurePath(self.tmp_folder).joinpath(temp_out_2), 'r') as file:
370 for line in file:
371 info = float(line.strip())
372 info_dict['evals_2'].append(info)
374 num_evals_1 = len(info_dict['evals_1'])
375 num_evals_2 = len(info_dict['evals_2'])
376 num_evals_min = min(num_evals_1, num_evals_2)
377 info_dict['num_evals_min'] = num_evals_min
379 # Command line 2
380 # pczdump -i structure.ca.std.pcz --evals -o evals.txt
381 self.cmd = []
382 self.cmd.append('cd')
383 self.cmd.append(self.tmp_folder)
384 self.cmd.append(';')
385 for pc in (range(1, num_evals_min+1)):
386 # Evecs pcz 1
387 self.cmd.append(self.binary_path)
388 self.cmd.append("-i")
389 # self.cmd.append(input_pcz_1)
390 self.cmd.append(PurePath(self.io_dict["in"]["input_pcz_path1"]).name)
391 # self.cmd.append("-o evecs_1_pc{}".format(pc))
392 self.cmd.append("-o")
393 # self.cmd.append(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_1_pc{}".format(pc))))
394 self.cmd.append("evecs_1_pc{}".format(pc))
395 self.cmd.append("--evec={}".format(pc))
396 self.cmd.append(";")
397 # Evals pcz 2
398 self.cmd.append(self.binary_path)
399 self.cmd.append("-i")
400 # self.cmd.append(input_pcz_2)
401 self.cmd.append(PurePath(self.io_dict["in"]["input_pcz_path2"]).name)
402 # self.cmd.append("-o evecs_2_pc{}".format(pc))
403 self.cmd.append("-o")
404 # self.cmd.append(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_2_pc{}".format(pc))))
405 self.cmd.append("evecs_2_pc{}".format(pc))
406 self.cmd.append("--evec={}".format(pc))
407 self.cmd.append(";")
409 # Run Biobb block 2
410 self.run_biobb()
412 # Parse output evecs
413 info_dict['evecs_1'] = {}
414 info_dict['evecs_2'] = {}
415 eigenvectors_1 = []
416 eigenvectors_2 = []
417 for pc in (range(1, num_evals_min+1)):
418 pc_id = "pc{}".format(pc)
419 info_dict['evecs_1'][pc_id] = []
420 info_dict['evecs_2'][pc_id] = []
421 # with open(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_1_pc{}".format(pc))), 'r') as file:
422 with open(PurePath(self.tmp_folder).joinpath("evecs_1_pc{}".format(pc)), 'r') as file:
423 list_evecs = []
424 for line in file:
425 info = line.strip().split(' ')
426 for nums in info:
427 if nums:
428 list_evecs.append(float(nums))
429 info_dict['evecs_1'][pc_id] = list_evecs
430 eigenvectors_1.append(list_evecs)
432 # with open(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_2_pc{}".format(pc))), 'r') as file:
433 with open(PurePath(self.tmp_folder).joinpath("evecs_2_pc{}".format(pc)), 'r') as file:
434 list_evecs = []
435 for line in file:
436 info = line.strip().split(' ')
437 for nums in info:
438 if nums:
439 list_evecs.append(float(nums))
440 info_dict['evecs_2'][pc_id] = list_evecs
441 eigenvectors_2.append(list_evecs)
443 # simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
444 # info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex))
445 # dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2)
446 # info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct))
447 eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor)
448 info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip))
449 rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2)
450 info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip))
451 rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2)
452 info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip))
453 dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2)
454 info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp))
456 with open(PurePath(self.tmp_folder).joinpath(temp_json), 'w') as out_file:
457 out_file.write(json.dumps(info_dict, indent=4))
459 # Copy outputs from temporary folder to output path
460 shutil.copy2(PurePath(self.tmp_folder).joinpath(temp_json), PurePath(self.io_dict["out"]["output_json_path"]))
462 # remove temporary folder(s)
463 self.tmp_files.extend([
464 self.tmp_folder
465 ])
466 self.remove_tmp_files()
468 self.check_arguments(output_files_created=True, raise_exception=False)
470 return self.return_code
473def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str,
474 properties: Optional[dict] = None, **kwargs) -> int:
475 """Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and
476 execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method"""
478 return PCZsimilarity(input_pcz_path1=input_pcz_path1,
479 input_pcz_path2=input_pcz_path2,
480 output_json_path=output_json_path,
481 properties=properties).launch()
483 pcz_similarity.__doc__ = PCZsimilarity.__doc__
486def main():
487 parser = argparse.ArgumentParser(description='Compute PCA Similarity from a given pair of compressed PCZ files.', formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
488 parser.add_argument('--config', required=False, help='Configuration file')
490 # Specific args
491 required_args = parser.add_argument_group('required arguments')
492 required_args.add_argument('--input_pcz_path1', required=True, help='Input compressed trajectory file 1. Accepted formats: pcz.')
493 required_args.add_argument('--input_pcz_path2', required=True, help='Input compressed trajectory file 2. Accepted formats: pcz.')
494 required_args.add_argument('--output_json_path', required=True, help='Output json file with PCA similarity. Accepted formats: json.')
496 args = parser.parse_args()
497 args.config = args.config or "{}"
498 properties = settings.ConfReader(config=args.config).get_prop_dic()
500 # Specific call
501 pcz_similarity(input_pcz_path1=args.input_pcz_path1,
502 input_pcz_path2=args.input_pcz_path2,
503 output_json_path=args.output_json_path,
504 properties=properties)
507if __name__ == '__main__':
508 main()