Coverage for biobb_flexserv / pcasuite / pcz_similarity.py: 78%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-05 13:10 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the PCZsimilarity class and the command line interface.""" 

4from typing import Optional 

5import json 

6import numpy as np 

7import shutil 

8from pathlib import PurePath 

9from biobb_common.tools import file_utils as fu 

10from math import exp 

11from biobb_common.generic.biobb_object import BiobbObject 

12from biobb_common.tools.file_utils import launchlogger 

13 

14 

15class PCZsimilarity(BiobbObject): 

16 """ 

17 | biobb_flexserv PCZsimilarity 

18 | Compute PCA similarity between two given compressed PCZ files. 

19 | Wrapper of the pczdump tool from the PCAsuite FlexServ module. 

20 

21 Args: 

22 input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874). 

23 input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874). 

24 output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464). 

25 properties (dict - Python dictionary object containing the tool parameters, not input/output files): 

26 * **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product). 

27 * **binary_path** (*str*) - ("pczdump") pczdump binary path to be used. 

28 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

29 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

30 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

31 

32 Examples: 

33 This is a use example of how to use the building block from Python:: 

34 

35 from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity 

36 

37 pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz', 

38 input_pcz_path2='/path/to/pcazip_input2.pcz', 

39 output_json_path='/path/to/pcz_similarity.json', 

40 properties=prop) 

41 

42 Info: 

43 * wrapped_software: 

44 * name: FlexServ PCAsuite 

45 * version: >=1.0 

46 * license: Apache-2.0 

47 * ontology: 

48 * name: EDAM 

49 * schema: http://edamontology.org/EDAM.owl 

50 

51 """ 

52 

53 def __init__(self, input_pcz_path1: str, input_pcz_path2: str, 

54 output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None: 

55 

56 properties = properties or {} 

57 

58 # Call parent class constructor 

59 super().__init__(properties) 

60 self.locals_var_dict = locals().copy() 

61 

62 # Input/Output files 

63 self.io_dict = { 

64 'in': {'input_pcz_path1': input_pcz_path1, 

65 'input_pcz_path2': input_pcz_path2}, 

66 'out': {'output_json_path': output_json_path} 

67 } 

68 

69 # Properties specific for BB 

70 self.properties = properties 

71 self.amplifying_factor = properties.get('amplifying_factor') 

72 self.binary_path = properties.get('binary_path', 'pczdump') 

73 

74 # Check the properties 

75 self.check_properties(properties) 

76 self.check_arguments() 

77 

78 # Check two eigenvectors to be compatible for dot product 

79 # i.e. same number of vectors and values per vector 

80 def are_compatible(self, eigenvectors_1, eigenvectors_2): 

81 # Check the number of eigenvectors and the number of values in both eigenvectors to match 

82 if len(eigenvectors_1) != len(eigenvectors_2): 

83 print('WARNING: Number of eigenvectors does not match') 

84 return False 

85 if len(eigenvectors_1[0]) != len(eigenvectors_2[0]): 

86 print('WARNING: Number of values in eigenvectors does not match') 

87 return False 

88 return True 

89 

90 # Weighted Cross Product (WCP). 

91 # Get the weighted cross product between eigenvectors 

92 # This is meant to compare PCA results for molecular dynamics structural conformations 

93 # The number of eigenvectors to be compared may be specified. All (0) by default 

94 # DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04) 

95 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda, 

96 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1. 

97 def get_similarity_index(self, 

98 eigenvalues_1, eigenvectors_1, 

99 eigenvalues_2, eigenvectors_2, dx=None): 

100 

101 # Check the number of eigenvectors and the number of values in both eigenvectors to match 

102 if not self.are_compatible(eigenvectors_1, eigenvectors_2): 

103 raise SystemExit('Eigenvectors are not compatible') 

104 

105 # Find out the total number of eigenvectors 

106 # Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total 

107 eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2)) 

108 

109 # Find out the number of atoms in the structure 

110 # Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z) 

111 if len(eigenvectors_1[0]) % 3 != 0: 

112 raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3') 

113 atom_number = int(len(eigenvectors_1[0]) / 3) 

114 

115 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product. 

116 # The value of the ~10th eigenvalue is usually taken. 

117 if dx is not None: 

118 amplifying_factor = dx 

119 else: 

120 amplifying_factor = eigenvalues_1[eigenvectors_number-1] 

121 

122 # Get the denominator 

123 # Find new denominator 

124 # for ($i=0;$i<$nvec;$i++){ 

125 # $cte1+=exp(-1/$val_1[$i]*$ampf); 

126 # $cte2+=exp(-1/$val_2[$i]*$ampf); 

127 # $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf); 

128 # $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf); 

129 # } 

130 cte1 = part1 = cte2 = part2 = 0 

131 for eigenvalue in eigenvalues_1: 

132 cte1 += exp(-1 / eigenvalue * amplifying_factor) 

133 part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2 

134 for eigenvalue in eigenvalues_2: 

135 cte2 += exp(-1 / eigenvalue * amplifying_factor) 

136 part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2 

137 denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2 

138 

139 # Get all eigenvector values together 

140 eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev] 

141 eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev] 

142 

143 # IDK what it is doing now 

144 total_summatory = 0 

145 for i in range(eigenvectors_number): 

146 for j in range(eigenvectors_number): 

147 # Array has vectors in increasing order of vap, get last one first 

148 a = (eigenvectors_number - 1 - i) * atom_number * 3 

149 b = (eigenvectors_number - i) * atom_number * 3 - 1 

150 c = (eigenvectors_number - 1 - j) * atom_number * 3 

151 d = (eigenvectors_number - j) * atom_number * 3 - 1 

152 temp1 = eigenvector_values_1[a:b] 

153 temp2 = eigenvector_values_2[c:d] 

154 if len(temp1) != len(temp2): 

155 raise ValueError("Projection of vectors of different size!!") 

156 # Project the two vectors 

157 add = 0 

158 for k, value_1 in enumerate(temp1): 

159 value_2 = temp2[k] 

160 add += value_1 * value_2 

161 add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor) 

162 add2 = add ** 2 

163 total_summatory += add2 

164 

165 similarity_index = total_summatory * 2 / denominator 

166 return similarity_index 

167 

168 # Weighted Cross Product (WCP). 

169 # DF implementation of Alberto's formula 

170 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda, 

171 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1. 

172 def eigenmsip(self, eigenvalues_1, eigenvectors_1, 

173 eigenvalues_2, eigenvectors_2, 

174 dx=None): 

175 

176 evals1 = np.array(eigenvalues_1) 

177 evals2 = np.array(eigenvalues_2) 

178 

179 evecs1 = np.array(eigenvectors_1) 

180 evecs2 = np.array(eigenvectors_2) 

181 

182 n_components = len(eigenvectors_1) 

183 

184 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product. 

185 # The value of the ~10th eigenvalue is usually taken. 

186 if dx is not None: 

187 amplifying_factor = dx 

188 else: 

189 amplifying_factor = evals1[n_components-1] 

190 

191 e1 = np.exp(-(amplifying_factor)**2/evals1) 

192 e2 = np.exp(-(amplifying_factor)**2/evals2) 

193 

194 e1_2 = np.exp(-2*(amplifying_factor)**2/evals1) 

195 e2_2 = np.exp(-2*(amplifying_factor)**2/evals2) 

196 sume1 = np.sum(e1) 

197 sume2 = np.sum(e2) 

198 

199 denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2) 

200 

201 # numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2)) 

202 # numerator_df = 2 * np.sum(numerator_df) 

203 

204 val_tmp = 0 

205 accum_a = 0 

206 c = sume1*sume2 

207 for pc in range(0, n_components): 

208 for pc2 in range(0, n_components): 

209 eve1 = evecs1[pc] 

210 eve2 = evecs2[pc2] 

211 eva1 = evals1[pc] 

212 eva2 = evals2[pc2] 

213 a = np.dot(eve1, eve2) 

214 b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2) 

215 val_tmp = val_tmp + ((a * b)/c)**2 

216 accum_a = accum_a + a 

217 

218 numerator = 2 * val_tmp 

219 

220 return numerator/(denominator) 

221 

222 # Get the dot product matrix of two eigenvectors 

223 def dot_product(self, eigenvectors_1, eigenvectors_2): 

224 # Check the number of eigenvectors and the number of values in both eigenvectors to match 

225 if not self.are_compatible(eigenvectors_1, eigenvectors_2): 

226 raise SystemExit('Eigenvectors are not compatible') 

227 # Get the dot product 

228 dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2)) 

229 return dpm 

230 

231 # Get the dot product matrix of two eigenvectors (squared and normalized). 

232 # Absolute Similarity Index (Hess 2000, 2002) 

233 def dot_product_accum(self, eigenvectors_1, eigenvectors_2): 

234 n_components = len(eigenvectors_1) 

235 # Get the dot product 

236 dpm = self.dot_product(eigenvectors_1, eigenvectors_2) 

237 

238 sso = (dpm * dpm).sum() / n_components 

239 # sso = (dpm * dpm).sum() / n_components 

240 return sso 

241 

242 # Get the subspace overlap 

243 # Same as before, but with a square root 

244 def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2): 

245 # Get the number of eigenvectors 

246 n_components = len(eigenvectors_1) 

247 # Get the dot product 

248 dpm = self.dot_product(eigenvectors_1, eigenvectors_2) 

249 

250 sso = np.sqrt((dpm * dpm).sum() / n_components) 

251 # sso = (dpm * dpm).sum() / n_components 

252 return sso 

253 

254 # Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap 

255 def get_rmsip(self, eigenvectors_1, eigenvectors_2): 

256 

257 # Get the number of eigenvectors 

258 n_components = len(eigenvectors_1) 

259 

260 accum = 0 

261 for pc in range(0, n_components): 

262 for pc2 in range(0, n_components): 

263 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2]) 

264 val = dpm * dpm 

265 accum = accum + val 

266 

267 sso = np.sqrt(accum / n_components) 

268 # sso = (dpm * dpm).sum() / n_components 

269 return sso 

270 

271 # RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values. 

272 # See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets, 

273 # Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445 

274 def get_rwsip(self, 

275 eigenvalues_1, eigenvectors_1, 

276 eigenvalues_2, eigenvectors_2): 

277 

278 # Get the number of eigenvectors 

279 n_components = len(eigenvectors_1) 

280 

281 accum = 0 

282 norm = 0 

283 for pc in range(0, n_components): 

284 for pc2 in range(0, n_components): 

285 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2]) 

286 val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2] 

287 accum = accum + val 

288 norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc] 

289 

290 sso = np.sqrt(accum / norm) 

291 # sso = (dpm * dpm).sum() / n_components 

292 return sso 

293 

294 @launchlogger 

295 def launch(self): 

296 """Launches the execution of the FlexServ pcz_similarity module.""" 

297 

298 # Setup Biobb 

299 if self.check_restart(): 

300 return 0 

301 # self.stage_files() 

302 

303 # Internal file paths 

304 # try: 

305 # # Using rel paths to shorten the amount of characters due to fortran path length limitations 

306 # input_pcz_1 = str(Path(self.stage_io_dict["in"]["input_pcz_path1"]).relative_to(Path.cwd())) 

307 # input_pcz_2 = str(Path(self.stage_io_dict["in"]["input_pcz_path2"]).relative_to(Path.cwd())) 

308 # output_json = str(Path(self.stage_io_dict["out"]["output_json_path"]).relative_to(Path.cwd())) 

309 # except ValueError: 

310 # # Container or remote case 

311 # output_json = self.stage_io_dict["out"]["output_json_path"] 

312 

313 # Manually creating a Sandbox to avoid issues with input parameters buffer overflow: 

314 # Long strings defining a file path makes Fortran or C compiled programs crash if the string 

315 # declared is shorter than the input parameter path (string) length. 

316 # Generating a temporary folder and working inside this folder (sandbox) fixes this problem. 

317 # The problem was found in Galaxy executions, launching Singularity containers (May 2023). 

318 

319 # Creating temporary folder 

320 tmp_folder = fu.create_unique_dir() 

321 fu.log('Creating %s temporary folder' % tmp_folder, self.out_log) 

322 

323 shutil.copy2(self.io_dict["in"]["input_pcz_path1"], tmp_folder) 

324 shutil.copy2(self.io_dict["in"]["input_pcz_path2"], tmp_folder) 

325 

326 # Temporary output 

327 temp_json = "output.json" 

328 temp_out_1 = "output1.dat" 

329 temp_out_2 = "output2.dat" 

330 

331 # Command line 1 

332 # pczdump -i structure.ca.std.pcz --evals -o evals.txt 

333 # self.cmd = [self.binary_path, # Evals pcz 1 

334 # "-i", input_pcz_1, 

335 # "-o", temp_out_1, 

336 # "--evals", ';', 

337 # self.binary_path, # Evals pcz 2 

338 # "-i", input_pcz_2, 

339 # "-o", temp_out_2, 

340 # "--evals" 

341 # ] 

342 

343 self.cmd = ['cd', tmp_folder, ';', 

344 self.binary_path, # Evals pcz 1 

345 "-i", PurePath(self.io_dict["in"]["input_pcz_path1"]).name, 

346 "-o", temp_out_1, 

347 "--evals", ';', 

348 self.binary_path, # Evals pcz 2 

349 "-i", PurePath(self.io_dict["in"]["input_pcz_path2"]).name, 

350 "-o", temp_out_2, 

351 "--evals" 

352 ] 

353 

354 # Run Biobb block 1 

355 self.run_biobb() 

356 

357 # Parse output evals 

358 info_dict = {} 

359 info_dict['evals_1'] = [] 

360 info_dict['evals_2'] = [] 

361 

362 with open(PurePath(tmp_folder).joinpath(temp_out_1), 'r') as file: 

363 for line in file: 

364 info = float(line.strip()) 

365 info_dict['evals_1'].append(info) 

366 

367 with open(PurePath(tmp_folder).joinpath(temp_out_2), 'r') as file: 

368 for line in file: 

369 info = float(line.strip()) 

370 info_dict['evals_2'].append(info) 

371 

372 num_evals_1 = len(info_dict['evals_1']) 

373 num_evals_2 = len(info_dict['evals_2']) 

374 num_evals_min = min(num_evals_1, num_evals_2) 

375 info_dict['num_evals_min'] = num_evals_min 

376 

377 # Command line 2 

378 # pczdump -i structure.ca.std.pcz --evals -o evals.txt 

379 self.cmd = [] 

380 self.cmd.append('cd') 

381 self.cmd.append(tmp_folder) 

382 self.cmd.append(';') 

383 for pc in (range(1, num_evals_min+1)): 

384 # Evecs pcz 1 

385 self.cmd.append(self.binary_path) 

386 self.cmd.append("-i") 

387 # self.cmd.append(input_pcz_1) 

388 self.cmd.append(PurePath(self.io_dict["in"]["input_pcz_path1"]).name) 

389 # self.cmd.append("-o evecs_1_pc{}".format(pc)) 

390 self.cmd.append("-o") 

391 # self.cmd.append(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_1_pc{}".format(pc)))) 

392 self.cmd.append("evecs_1_pc{}".format(pc)) 

393 self.cmd.append("--evec={}".format(pc)) 

394 self.cmd.append(";") 

395 # Evals pcz 2 

396 self.cmd.append(self.binary_path) 

397 self.cmd.append("-i") 

398 # self.cmd.append(input_pcz_2) 

399 self.cmd.append(PurePath(self.io_dict["in"]["input_pcz_path2"]).name) 

400 # self.cmd.append("-o evecs_2_pc{}".format(pc)) 

401 self.cmd.append("-o") 

402 # self.cmd.append(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_2_pc{}".format(pc)))) 

403 self.cmd.append("evecs_2_pc{}".format(pc)) 

404 self.cmd.append("--evec={}".format(pc)) 

405 self.cmd.append(";") 

406 

407 # Run Biobb block 2 

408 self.run_biobb() 

409 

410 # Parse output evecs 

411 info_dict['evecs_1'] = {} 

412 info_dict['evecs_2'] = {} 

413 eigenvectors_1 = [] 

414 eigenvectors_2 = [] 

415 for pc in (range(1, num_evals_min+1)): 

416 pc_id = "pc{}".format(pc) 

417 info_dict['evecs_1'][pc_id] = [] 

418 info_dict['evecs_2'][pc_id] = [] 

419 # with open(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_1_pc{}".format(pc))), 'r') as file: 

420 with open(PurePath(tmp_folder).joinpath("evecs_1_pc{}".format(pc)), 'r') as file: 

421 list_evecs = [] 

422 for line in file: 

423 info = line.strip().split(' ') 

424 for nums in info: 

425 if nums: 

426 list_evecs.append(float(nums)) 

427 info_dict['evecs_1'][pc_id] = list_evecs 

428 eigenvectors_1.append(list_evecs) 

429 

430 # with open(str(Path(self.stage_io_dict.get("unique_dir", "")).joinpath("evecs_2_pc{}".format(pc))), 'r') as file: 

431 with open(PurePath(tmp_folder).joinpath("evecs_2_pc{}".format(pc)), 'r') as file: 

432 list_evecs = [] 

433 for line in file: 

434 info = line.strip().split(' ') 

435 for nums in info: 

436 if nums: 

437 list_evecs.append(float(nums)) 

438 info_dict['evecs_2'][pc_id] = list_evecs 

439 eigenvectors_2.append(list_evecs) 

440 

441 # simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor) 

442 # info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex)) 

443 # dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2) 

444 # info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct)) 

445 eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor) 

446 info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip)) 

447 rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2) 

448 info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip)) 

449 rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2) 

450 info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip)) 

451 dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2) 

452 info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp)) 

453 

454 with open(PurePath(tmp_folder).joinpath(temp_json), 'w') as out_file: 

455 out_file.write(json.dumps(info_dict, indent=4)) 

456 

457 # Copy outputs from temporary folder to output path 

458 shutil.copy2(PurePath(tmp_folder).joinpath(temp_json), PurePath(self.io_dict["out"]["output_json_path"])) 

459 

460 # Remove temporary folder(s) 

461 self.tmp_files.append(tmp_folder) 

462 self.remove_tmp_files() 

463 

464 self.check_arguments(output_files_created=True, raise_exception=False) 

465 

466 return self.return_code 

467 

468 

469def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str, 

470 properties: Optional[dict] = None, **kwargs) -> int: 

471 """Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and 

472 execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method""" 

473 return PCZsimilarity(**dict(locals())).launch() 

474 

475 

476pcz_similarity.__doc__ = PCZsimilarity.__doc__ 

477main = PCZsimilarity.get_main(pcz_similarity, "Compute PCA Similarity from a given pair of compressed PCZ files.") 

478 

479if __name__ == '__main__': 

480 main()