Coverage for biobb_flexserv/pcasuite/pcz_similarity.py: 77%

228 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-28 11:28 +0000

1#!/usr/bin/env python3 

2 

3"""Module containing the PCZsimilarity class and the command line interface.""" 

4from typing import Optional 

5import json 

6import numpy as np 

7from pathlib import Path, PurePath 

8from math import exp 

9from biobb_common.generic.biobb_object import BiobbObject 

10from biobb_common.tools.file_utils import launchlogger 

11 

12 

13class PCZsimilarity(BiobbObject): 

14 """ 

15 | biobb_flexserv PCZsimilarity 

16 | Compute PCA similarity between two given compressed PCZ files. 

17 | Wrapper of the pczdump tool from the PCAsuite FlexServ module. 

18 

19 Args: 

20 input_pcz_path1 (str): Input compressed trajectory file 1. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874). 

21 input_pcz_path2 (str): Input compressed trajectory file 2. File type: input. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/data/pcasuite/pcazip.pcz>`_. Accepted formats: pcz (edam:format_3874). 

22 output_json_path (str): Output json file with PCA Similarity results. File type: output. `Sample file <https://github.com/bioexcel/biobb_flexserv/raw/master/biobb_flexserv/test/reference/pcasuite/pcz_similarity.json>`_. Accepted formats: json (edam:format_3464). 

23 properties (dict - Python dictionary object containing the tool parameters, not input/output files): 

24 * **amplifying_factor** (*float*) - ("0.0") common displacement (dx) along the different eigenvectors. If 0, the result is the absolute similarity index (dot product). 

25 * **binary_path** (*str*) - ("pczdump") pczdump binary path to be used. 

26 * **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files. 

27 * **restart** (*bool*) - (False) [WF property] Do not execute if output files exist. 

28 * **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory. 

29 * **container_path** (*str*) - (None) Container path definition. 

30 * **container_image** (*str*) - ('afandiadib/ambertools:serial') Container image definition. 

31 * **container_volume_path** (*str*) - ('/tmp') Container volume path definition. 

32 * **container_working_dir** (*str*) - (None) Container working directory definition. 

33 * **container_user_id** (*str*) - (None) Container user_id definition. 

34 * **container_shell_path** (*str*) - ('/bin/bash') Path to default shell inside the container. 

35 

36 Examples: 

37 This is a use example of how to use the building block from Python:: 

38 

39 from biobb_flexserv.pcasuite.pcz_similarity import pcz_similarity 

40 

41 pcz_similarity( input_pcz_path1='/path/to/pcazip_input1.pcz', 

42 input_pcz_path2='/path/to/pcazip_input2.pcz', 

43 output_json_path='/path/to/pcz_similarity.json', 

44 properties=prop) 

45 

46 Info: 

47 * wrapped_software: 

48 * name: FlexServ PCAsuite 

49 * version: >=1.0 

50 * license: Apache-2.0 

51 * ontology: 

52 * name: EDAM 

53 * schema: http://edamontology.org/EDAM.owl 

54 

55 """ 

56 

57 def __init__(self, input_pcz_path1: str, input_pcz_path2: str, 

58 output_json_path: str, properties: Optional[dict] = None, **kwargs) -> None: 

59 

60 properties = properties or {} 

61 

62 # Call parent class constructor 

63 super().__init__(properties) 

64 self.locals_var_dict = locals().copy() 

65 

66 # Input/Output files 

67 self.io_dict = { 

68 'in': {'input_pcz_path1': input_pcz_path1, 

69 'input_pcz_path2': input_pcz_path2}, 

70 'out': {'output_json_path': output_json_path} 

71 } 

72 

73 # Properties specific for BB 

74 self.properties = properties 

75 self.amplifying_factor = properties.get('amplifying_factor') 

76 self.binary_path = properties.get('binary_path', 'pczdump') 

77 

78 # Check the properties 

79 self.check_properties(properties) 

80 self.check_arguments() 

81 

82 # Check two eigenvectors to be compatible for dot product 

83 # i.e. same number of vectors and values per vector 

84 def are_compatible(self, eigenvectors_1, eigenvectors_2): 

85 # Check the number of eigenvectors and the number of values in both eigenvectors to match 

86 if len(eigenvectors_1) != len(eigenvectors_2): 

87 print('WARNING: Number of eigenvectors does not match') 

88 return False 

89 if len(eigenvectors_1[0]) != len(eigenvectors_2[0]): 

90 print('WARNING: Number of values in eigenvectors does not match') 

91 return False 

92 return True 

93 

94 # Weighted Cross Product (WCP). 

95 # Get the weighted cross product between eigenvectors 

96 # This is meant to compare PCA results for molecular dynamics structural conformations 

97 # The number of eigenvectors to be compared may be specified. All (0) by default 

98 # DISCLAIMER: This code has been translated from a perl script signed by Alberto Perez (13/09/04) 

99 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda, 

100 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1. 

101 def get_similarity_index(self, 

102 eigenvalues_1, eigenvectors_1, 

103 eigenvalues_2, eigenvectors_2, dx=None): 

104 

105 # Check the number of eigenvectors and the number of values in both eigenvectors to match 

106 if not self.are_compatible(eigenvectors_1, eigenvectors_2): 

107 raise SystemExit('Eigenvectors are not compatible') 

108 

109 # Find out the total number of eigenvectors 

110 # Set the number of eigenvectors to be analyzed in case it is not set or it exceeds the total 

111 eigenvectors_number = min(len(eigenvectors_1), len(eigenvectors_2)) 

112 

113 # Find out the number of atoms in the structure 

114 # Eigenvectors are atom coordinates and each atom has 3 coordinates (x, y, z) 

115 if len(eigenvectors_1[0]) % 3 != 0: 

116 raise SystemExit('Something is wrong with eigenvectors since number of values is not divisor of 3') 

117 atom_number = int(len(eigenvectors_1[0]) / 3) 

118 

119 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product. 

120 # The value of the ~10th eigenvalue is usually taken. 

121 if dx is not None: 

122 amplifying_factor = dx 

123 else: 

124 amplifying_factor = eigenvalues_1[eigenvectors_number-1] 

125 

126 # Get the denominator 

127 # Find new denominator 

128 # for ($i=0;$i<$nvec;$i++){ 

129 # $cte1+=exp(-1/$val_1[$i]*$ampf); 

130 # $cte2+=exp(-1/$val_2[$i]*$ampf); 

131 # $part1+=exp(-2/$val_1[$i]*$ampf)*exp(-2/$val_1[$i]*$ampf); 

132 # $part2+=exp(-2/$val_2[$i]*$ampf)*exp(-2/$val_2[$i]*$ampf); 

133 # } 

134 cte1 = part1 = cte2 = part2 = 0 

135 for eigenvalue in eigenvalues_1: 

136 cte1 += exp(-1 / eigenvalue * amplifying_factor) 

137 part1 += exp(-2 / eigenvalue * amplifying_factor) ** 2 

138 for eigenvalue in eigenvalues_2: 

139 cte2 += exp(-1 / eigenvalue * amplifying_factor) 

140 part2 += exp(-2 / eigenvalue * amplifying_factor) ** 2 

141 denominator = part1 * cte2 * cte2 / cte1 / cte1 + part2 * cte1 * cte1 / cte2 / cte2 

142 

143 # Get all eigenvector values together 

144 eigenvector_values_1 = [v for ev in eigenvectors_1 for v in ev] 

145 eigenvector_values_2 = [v for ev in eigenvectors_2 for v in ev] 

146 

147 # IDK what it is doing now 

148 total_summatory = 0 

149 for i in range(eigenvectors_number): 

150 for j in range(eigenvectors_number): 

151 # Array has vectors in increasing order of vap, get last one first 

152 a = (eigenvectors_number - 1 - i) * atom_number * 3 

153 b = (eigenvectors_number - i) * atom_number * 3 - 1 

154 c = (eigenvectors_number - 1 - j) * atom_number * 3 

155 d = (eigenvectors_number - j) * atom_number * 3 - 1 

156 temp1 = eigenvector_values_1[a:b] 

157 temp2 = eigenvector_values_2[c:d] 

158 if len(temp1) != len(temp2): 

159 raise ValueError("Projection of vectors of different size!!") 

160 # Project the two vectors 

161 add = 0 

162 for k, value_1 in enumerate(temp1): 

163 value_2 = temp2[k] 

164 add += value_1 * value_2 

165 add = add * exp(-1 / eigenvalues_1[i] * amplifying_factor - 1 / eigenvalues_2[j] * amplifying_factor) 

166 add2 = add ** 2 

167 total_summatory += add2 

168 

169 similarity_index = total_summatory * 2 / denominator 

170 return similarity_index 

171 

172 # Weighted Cross Product (WCP). 

173 # DF implementation of Alberto's formula 

174 # Exploring the Essential Dynamics of B-DNA; Alberto Perez, Jose Ramon Blas, Manuel Rueda, 

175 # Jose Maria Lopez-Bes, Xavier de la Cruz and Modesto Orozco. J. Chem. Theory Comput.2005,1. 

176 def eigenmsip(self, eigenvalues_1, eigenvectors_1, 

177 eigenvalues_2, eigenvectors_2, 

178 dx=None): 

179 

180 evals1 = np.array(eigenvalues_1) 

181 evals2 = np.array(eigenvalues_2) 

182 

183 evecs1 = np.array(eigenvectors_1) 

184 evecs2 = np.array(eigenvectors_2) 

185 

186 n_components = len(eigenvectors_1) 

187 

188 # Amplifying factor: if it is 0 the algorithm is the same as a simple dot product. 

189 # The value of the ~10th eigenvalue is usually taken. 

190 if dx is not None: 

191 amplifying_factor = dx 

192 else: 

193 amplifying_factor = evals1[n_components-1] 

194 

195 e1 = np.exp(-(amplifying_factor)**2/evals1) 

196 e2 = np.exp(-(amplifying_factor)**2/evals2) 

197 

198 e1_2 = np.exp(-2*(amplifying_factor)**2/evals1) 

199 e2_2 = np.exp(-2*(amplifying_factor)**2/evals2) 

200 sume1 = np.sum(e1) 

201 sume2 = np.sum(e2) 

202 

203 denominator = np.sum((e1_2/sume1**2)**2)+np.sum((e2_2/sume2**2)**2) 

204 

205 # numerator_df = np.square(np.dot(evecs1, evecs2)*np.outer(e1, e2)/(sume1*sume2)) 

206 # numerator_df = 2 * np.sum(numerator_df) 

207 

208 val_tmp = 0 

209 accum_a = 0 

210 c = sume1*sume2 

211 for pc in range(0, n_components): 

212 for pc2 in range(0, n_components): 

213 eve1 = evecs1[pc] 

214 eve2 = evecs2[pc2] 

215 eva1 = evals1[pc] 

216 eva2 = evals2[pc2] 

217 a = np.dot(eve1, eve2) 

218 b = np.exp(-(amplifying_factor)**2/eva1 - (amplifying_factor)**2/eva2) 

219 val_tmp = val_tmp + ((a * b)/c)**2 

220 accum_a = accum_a + a 

221 

222 numerator = 2 * val_tmp 

223 

224 return numerator/(denominator) 

225 

226 # Get the dot product matrix of two eigenvectors 

227 def dot_product(self, eigenvectors_1, eigenvectors_2): 

228 # Check the number of eigenvectors and the number of values in both eigenvectors to match 

229 if not self.are_compatible(eigenvectors_1, eigenvectors_2): 

230 raise SystemExit('Eigenvectors are not compatible') 

231 # Get the dot product 

232 dpm = np.dot(eigenvectors_1, np.transpose(eigenvectors_2)) 

233 return dpm 

234 

235 # Get the dot product matrix of two eigenvectors (squared and normalized). 

236 # Absolute Similarity Index (Hess 2000, 2002) 

237 def dot_product_accum(self, eigenvectors_1, eigenvectors_2): 

238 n_components = len(eigenvectors_1) 

239 # Get the dot product 

240 dpm = self.dot_product(eigenvectors_1, eigenvectors_2) 

241 

242 sso = (dpm * dpm).sum() / n_components 

243 # sso = (dpm * dpm).sum() / n_components 

244 return sso 

245 

246 # Get the subspace overlap 

247 # Same as before, but with a square root 

248 def get_subspace_overlap(self, eigenvectors_1, eigenvectors_2): 

249 # Get the number of eigenvectors 

250 n_components = len(eigenvectors_1) 

251 # Get the dot product 

252 dpm = self.dot_product(eigenvectors_1, eigenvectors_2) 

253 

254 sso = np.sqrt((dpm * dpm).sum() / n_components) 

255 # sso = (dpm * dpm).sum() / n_components 

256 return sso 

257 

258 # Classic RMSip (Root Mean Square Inner Product), gives the same results as the previous function get_subspace_overlap 

259 def get_rmsip(self, eigenvectors_1, eigenvectors_2): 

260 

261 # Get the number of eigenvectors 

262 n_components = len(eigenvectors_1) 

263 

264 accum = 0 

265 for pc in range(0, n_components): 

266 for pc2 in range(0, n_components): 

267 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2]) 

268 val = dpm * dpm 

269 accum = accum + val 

270 

271 sso = np.sqrt(accum / n_components) 

272 # sso = (dpm * dpm).sum() / n_components 

273 return sso 

274 

275 # RWSIP, Root Weighted Square Inner Product. Same as before, but weighted using the eigen values. 

276 # See Edvin Fuglebakk and others, Measuring and comparing structural fluctuation patterns in large protein datasets, 

277 # Bioinformatics, Volume 28, Issue 19, October 2012, Pages 2431–2440, https://doi.org/10.1093/bioinformatics/bts445 

278 def get_rwsip(self, 

279 eigenvalues_1, eigenvectors_1, 

280 eigenvalues_2, eigenvectors_2): 

281 

282 # Get the number of eigenvectors 

283 n_components = len(eigenvectors_1) 

284 

285 accum = 0 

286 norm = 0 

287 for pc in range(0, n_components): 

288 for pc2 in range(0, n_components): 

289 dpm = np.dot(eigenvectors_1[pc], eigenvectors_2[pc2]) 

290 val = dpm * dpm * eigenvalues_1[pc] * eigenvalues_2[pc2] 

291 accum = accum + val 

292 norm = norm + eigenvalues_1[pc] * eigenvalues_2[pc] 

293 

294 sso = np.sqrt(accum / norm) 

295 # sso = (dpm * dpm).sum() / n_components 

296 return sso 

297 

298 @launchlogger 

299 def launch(self): 

300 """Launches the execution of the FlexServ pcz_similarity module.""" 

301 

302 # Setup Biobb 

303 if self.check_restart(): 

304 return 0 

305 self.stage_files() 

306 

307 if self.container_path: 

308 working_dir = self.container_volume_path if self.container_volume_path else "/data" 

309 else: 

310 working_dir = self.stage_io_dict.get("unique_dir", "") 

311 

312 unique_dir = Path(self.stage_io_dict.get("unique_dir", "")) 

313 

314 # Temporary output 

315 temp_out_1 = "output1.dat" 

316 temp_out_2 = "output2.dat" 

317 temp_out_1_path = unique_dir.joinpath(temp_out_1) 

318 temp_out_2_path = unique_dir.joinpath(temp_out_2) 

319 staged_output_json_path = unique_dir.joinpath(Path(self.stage_io_dict["out"]["output_json_path"]).name) 

320 

321 # Command line 1 

322 # pczdump -i structure.ca.std.pcz --evals -o evals.txt 

323 # self.cmd = [self.binary_path, # Evals pcz 1 

324 # "-i", input_pcz_1, 

325 # "-o", temp_out_1, 

326 # "--evals", ';', 

327 # self.binary_path, # Evals pcz 2 

328 # "-i", input_pcz_2, 

329 # "-o", temp_out_2, 

330 # "--evals" 

331 # ] 

332 

333 self.cmd = ['cd', working_dir, ';', 

334 self.binary_path, # Evals pcz 1 

335 "-i", PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name, 

336 "-o", temp_out_1, 

337 "--evals", ';', 

338 self.binary_path, # Evals pcz 2 

339 "-i", PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name, 

340 "-o", temp_out_2, 

341 "--evals" 

342 ] 

343 

344 # Run Biobb block 1 

345 self.run_biobb() 

346 

347 # Parse output evals 

348 info_dict = {} 

349 info_dict['evals_1'] = [] 

350 info_dict['evals_2'] = [] 

351 

352 with open(temp_out_1_path, 'r') as file: 

353 for line in file: 

354 info = float(line.strip()) 

355 info_dict['evals_1'].append(info) 

356 

357 with open(temp_out_2_path, 'r') as file: 

358 for line in file: 

359 info = float(line.strip()) 

360 info_dict['evals_2'].append(info) 

361 

362 num_evals_1 = len(info_dict['evals_1']) 

363 num_evals_2 = len(info_dict['evals_2']) 

364 num_evals_min = min(num_evals_1, num_evals_2) 

365 info_dict['num_evals_min'] = num_evals_min 

366 

367 # Command line 2 

368 # pczdump -i structure.ca.std.pcz --evals -o evals.txt 

369 self.cmd = ['cd', working_dir, ';'] 

370 for pc in (range(1, num_evals_min+1)): 

371 # Evecs pcz 1 

372 self.cmd.append(self.binary_path) 

373 self.cmd.append("-i") 

374 self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path1"]).name) 

375 self.cmd.append("-o") 

376 self.cmd.append("evecs_1_pc{}".format(pc)) 

377 self.cmd.append("--evec={}".format(pc)) 

378 self.cmd.append(";") 

379 # Evals pcz 2 

380 self.cmd.append(self.binary_path) 

381 self.cmd.append("-i") 

382 self.cmd.append(PurePath(self.stage_io_dict["in"]["input_pcz_path2"]).name) 

383 self.cmd.append("-o") 

384 self.cmd.append("evecs_2_pc{}".format(pc)) 

385 self.cmd.append("--evec={}".format(pc)) 

386 self.cmd.append(";") 

387 

388 # Run Biobb block 2 

389 self.run_biobb() 

390 

391 # Parse output evecs 

392 info_dict['evecs_1'] = {} 

393 info_dict['evecs_2'] = {} 

394 eigenvectors_1 = [] 

395 eigenvectors_2 = [] 

396 for pc in (range(1, num_evals_min+1)): 

397 pc_id = "pc{}".format(pc) 

398 info_dict['evecs_1'][pc_id] = [] 

399 info_dict['evecs_2'][pc_id] = [] 

400 with open(unique_dir.joinpath("evecs_1_pc{}".format(pc)), 'r') as file: 

401 list_evecs = [] 

402 for line in file: 

403 info = line.strip().split(' ') 

404 for nums in info: 

405 if nums: 

406 list_evecs.append(float(nums)) 

407 info_dict['evecs_1'][pc_id] = list_evecs 

408 eigenvectors_1.append(list_evecs) 

409 

410 with open(unique_dir.joinpath("evecs_2_pc{}".format(pc)), 'r') as file: 

411 list_evecs = [] 

412 for line in file: 

413 info = line.strip().split(' ') 

414 for nums in info: 

415 if nums: 

416 list_evecs.append(float(nums)) 

417 info_dict['evecs_2'][pc_id] = list_evecs 

418 eigenvectors_2.append(list_evecs) 

419 

420 # simIndex = self.get_similarity_index(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor) 

421 # info_dict['similarityIndex_WCP2'] = float("{:.3f}".format(simIndex)) 

422 # dotProduct = self.get_subspace_overlap(eigenvectors_1, eigenvectors_2) 

423 # info_dict['similarityIndex_rmsip2'] = float("{:.3f}".format(dotProduct)) 

424 eigenmsip = self.eigenmsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2, self.amplifying_factor) 

425 info_dict['similarityIndex_WCP'] = float("{:.3f}".format(eigenmsip)) 

426 rmsip = self.get_rmsip(eigenvectors_1, eigenvectors_2) 

427 info_dict['similarityIndex_rmsip'] = float("{:.3f}".format(rmsip)) 

428 rwsip = self.get_rwsip(info_dict['evals_1'], eigenvectors_1, info_dict['evals_2'], eigenvectors_2) 

429 info_dict['similarityIndex_rwsip'] = float("{:.3f}".format(rwsip)) 

430 dotp = self.dot_product_accum(eigenvectors_1, eigenvectors_2) 

431 info_dict['similarityIndex_dotp'] = float("{:.3f}".format(dotp)) 

432 

433 with open(staged_output_json_path, 'w') as out_file: 

434 out_file.write(json.dumps(info_dict, indent=4)) 

435 

436 # Copy files to host 

437 self.copy_to_host() 

438 

439 # Remove temporary folder(s) 

440 self.remove_tmp_files() 

441 

442 self.check_arguments(output_files_created=True, raise_exception=False) 

443 

444 return self.return_code 

445 

446 

447def pcz_similarity(input_pcz_path1: str, input_pcz_path2: str, output_json_path: str, 

448 properties: Optional[dict] = None, **kwargs) -> int: 

449 """Create :class:`PCZsimilarity <flexserv.pcasuite.pcz_similarity>`flexserv.pcasuite.PCZsimilarity class and 

450 execute :meth:`launch() <flexserv.pcasuite.pcz_similarity.launch>` method""" 

451 return PCZsimilarity(**dict(locals())).launch() 

452 

453 

454pcz_similarity.__doc__ = PCZsimilarity.__doc__ 

455main = PCZsimilarity.get_main(pcz_similarity, "Compute PCA Similarity from a given pair of compressed PCZ files.") 

456 

457if __name__ == '__main__': 

458 main()