Coverage for biobb_analysis / gromacs / common.py: 60%

334 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 10:53 +0000

1"""Common functions for package biobb_analysis.gromacs""" 

2 

3import re 

4import shutil 

5from pathlib import Path, PurePath 

6from typing import Optional, Union 

7 

8from biobb_common.command_wrapper import cmd_wrapper 

9from biobb_common.tools import file_utils as fu 

10 

11 

12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool: 

13 print("Comparing GROMACS files:") 

14 print("FILE_A: %s" % str(Path(file_a).resolve())) 

15 print("FILE_B: %s" % str(Path(file_b).resolve())) 

16 check_result = "check_result.out" 

17 cmd = [gmx, "check"] 

18 if file_a.endswith(".tpr"): 

19 cmd.append("-s1") 

20 else: 

21 cmd.append("-f") 

22 cmd.append(file_a) 

23 if file_b.endswith(".tpr"): 

24 cmd.append("-s2") 

25 else: 

26 cmd.append("-f2") 

27 cmd.append(file_b) 

28 cmd.append("> check_result.out") 

29 cmd_wrapper.CmdWrapper(cmd).launch() 

30 print("Result file: %s" % str(Path(check_result).resolve())) 

31 with open(check_result) as check_file: 

32 for line_num, line in enumerate(check_file): 

33 if not line.rstrip(): 

34 continue 

35 if line.startswith("Both files read correctly"): 

36 continue 

37 if not line.startswith("comparing"): 

38 print("Discrepance found in line %d: %s" % (line_num, line)) 

39 return False 

40 return True 

41 

42 

43def check_energy_path(path, out_log, classname): 

44 """Checks energy input file""" 

45 if not Path(path).exists(): 

46 fu.log(classname + ": Unexisting energy input file, exiting", out_log) 

47 raise SystemExit(classname + ": Unexisting energy input file") 

48 file_extension = PurePath(path).suffix 

49 if not is_valid_energy(file_extension[1:]): 

50 fu.log( 

51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:], 

52 out_log, 

53 ) 

54 raise SystemExit( 

55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:] 

56 ) 

57 # if file input has no path, add cwd because execution is launched on tmp folder 

58 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

59 path = str(PurePath(Path.cwd()).joinpath(path)) 

60 return path 

61 

62 

63def check_input_path(path, out_log, classname): 

64 """Checks input structure file""" 

65 if not Path(path).exists(): 

66 fu.log(classname + ": Unexisting structure input file, exiting", out_log) 

67 raise SystemExit(classname + ": Unexisting structure input file") 

68 file_extension = PurePath(path).suffix 

69 if not is_valid_structure(file_extension[1:]): 

70 fu.log( 

71 classname + ": Format %s in structure input file is not compatible" 

72 % file_extension[1:], 

73 out_log, 

74 ) 

75 raise SystemExit( 

76 classname + ": Format %s in structure input file is not compatible" 

77 % file_extension[1:] 

78 ) 

79 # if file input has no path, add cwd because execution is launched on tmp folder 

80 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

81 path = str(PurePath(Path.cwd()).joinpath(path)) 

82 return path 

83 

84 

85def check_index_path(path, out_log, classname): 

86 """Checks index input file""" 

87 if not path: 

88 return None 

89 file_extension = PurePath(path).suffix 

90 if not is_valid_index(file_extension[1:]): 

91 fu.log( 

92 classname + ": Format %s in index input file is not compatible" % file_extension[1:], 

93 out_log, 

94 ) 

95 raise SystemExit( 

96 classname + ": Format %s in index input file is not compatible" % file_extension[1:] 

97 ) 

98 # if file input has no path, add cwd because execution is launched on tmp folder 

99 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

100 path = str(PurePath(Path.cwd()).joinpath(path)) 

101 return path 

102 

103 

104def check_traj_path(path, out_log, classname): 

105 """Checks input structure file""" 

106 if not Path(path).exists(): 

107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log) 

108 raise SystemExit(classname + ": Unexisting trajectory input file") 

109 file_extension = PurePath(path).suffix 

110 if not is_valid_trajectory(file_extension[1:]): 

111 fu.log( 

112 classname + ": Format %s in trajectory input file is not compatible" 

113 % file_extension[1:], 

114 out_log, 

115 ) 

116 raise SystemExit( 

117 classname + ": Format %s in trajectory input file is not compatible" 

118 % file_extension[1:] 

119 ) 

120 # if file input has no path, add cwd because execution is launched on tmp folder 

121 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

122 path = str(PurePath(Path.cwd()).joinpath(path)) 

123 return path 

124 

125 

126def check_out_xvg_path(path, out_log, classname): 

127 """Checks if output folder exists and format is xvg""" 

128 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

129 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

130 raise SystemExit(classname + ": Unexisting output folder") 

131 file_extension = PurePath(path).suffix 

132 if not is_valid_xvg(file_extension[1:]): 

133 fu.log( 

134 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

135 out_log, 

136 ) 

137 raise SystemExit( 

138 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

139 ) 

140 return path 

141 

142 

143def check_out_log_path(path, out_log, classname): 

144 """Checks if output folder exists for log-like files""" 

145 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

146 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

147 raise SystemExit(classname + ": Unexisting output folder") 

148 return path 

149 

150 

151def check_out_pdb_path(path, out_log, classname): 

152 """Checks if output folder exists and format is xvg""" 

153 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

154 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

155 raise SystemExit(classname + ": Unexisting output folder") 

156 file_extension = PurePath(path).suffix 

157 if not is_valid_structure(file_extension[1:]): 

158 fu.log( 

159 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

160 out_log, 

161 ) 

162 raise SystemExit( 

163 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

164 ) 

165 return path 

166 

167 

168def check_out_traj_path(path, out_log, classname): 

169 """Checks if output folder exists and format is correct""" 

170 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

171 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

172 raise SystemExit(classname + ": Unexisting output folder") 

173 file_extension = PurePath(path).suffix 

174 if not is_valid_trajectory_output(file_extension[1:]): 

175 fu.log( 

176 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

177 out_log, 

178 ) 

179 raise SystemExit( 

180 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

181 ) 

182 return path 

183 

184 

185def check_out_str_ens_path(path, out_log, classname): 

186 """Checks if output folder exists and format is correct""" 

187 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

188 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

189 raise SystemExit(classname + ": Unexisting output folder") 

190 file_extension = PurePath(path).suffix 

191 if not is_valid_zip(file_extension[1:]): 

192 fu.log( 

193 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

194 out_log, 

195 ) 

196 raise SystemExit( 

197 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

198 ) 

199 return path 

200 

201 

202def get_default_value(key): 

203 """Gives default values according to the given key""" 

204 

205 default_values = { 

206 "instructions_file": "instructions.in", 

207 "binary_path": "gmx", 

208 "terms": ["Potential"], 

209 "selection": "System", 

210 "xvg": "none", 

211 "dista": False, 

212 "method": "linkage", 

213 "cutoff": 0.1, 

214 "cluster_selection": "System", 

215 "fit_selection": "System", 

216 "center_selection": "System", 

217 "output_selection": "System", 

218 "pbc": "mol", 

219 "center": True, 

220 "fit": "none", 

221 "ur": "compact", 

222 "skip": 1, 

223 "start": None, 

224 "end": None, 

225 "dt": None, 

226 "dump": None, 

227 "ot_str_ens": "pdb", 

228 } 

229 

230 return default_values[key] 

231 

232 

233def get_binary_path(properties, type): 

234 """Gets binary path""" 

235 return properties.get(type, get_default_value(type)) 

236 

237 

238def get_terms(properties, out_log, classname): 

239 """Gets energy terms""" 

240 terms = properties.get("terms", dict()) 

241 if not terms or not isinstance(terms, list): 

242 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log) 

243 raise SystemExit(classname + ": No terms provided or incorrect format") 

244 if not is_valid_term(terms): 

245 fu.log(classname + ": Incorrect terms provided, exiting", out_log) 

246 raise SystemExit(classname + ": Incorrect terms provided") 

247 return properties.get("terms", "") 

248 

249 

250def get_selection(properties, out_log, classname): 

251 """Gets selection items""" 

252 selection = properties.get("selection", get_default_value("selection")) 

253 if not selection: 

254 fu.log( 

255 classname + ": No selection provided or incorrect format, exiting", out_log 

256 ) 

257 raise SystemExit(classname + ": No selection provided or incorrect format") 

258 if not is_valid_selection(selection): 

259 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

260 raise SystemExit(classname + ": Incorrect selection provided") 

261 return selection 

262 

263 

264def get_image_selection(properties, key, out_log, classname): 

265 """Gets selection items""" 

266 selection = properties.get(key, get_default_value(key)) 

267 if not selection: 

268 fu.log( 

269 classname + ": No selection provided or incorrect format, exiting", out_log 

270 ) 

271 raise SystemExit(classname + ": No selection provided or incorrect format") 

272 if not is_valid_selection(selection): 

273 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

274 raise SystemExit(classname + ": Incorrect selection provided") 

275 return selection 

276 

277 

278def get_selection_index_file(properties, index, key, out_log, classname): 

279 """Gets selection items from provided index file""" 

280 pattern = re.compile(r"\[.*\]") 

281 selection = [] 

282 with open(index, "r") as ndx_file: 

283 for i, line in enumerate(ndx_file): 

284 for match in re.finditer(pattern, line): 

285 selection.append(re.sub(r"[\[\] ]", "", match.group())) 

286 sel = properties.get(key, get_default_value(key)) 

287 if sel not in selection: 

288 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

289 raise SystemExit(classname + ": Incorrect selection provided") 

290 return sel 

291 

292 

293def get_pbc(properties, out_log, classname): 

294 """Gets pbc""" 

295 pbc = properties.get("pbc", get_default_value("pbc")) 

296 if not is_valid_pbc(pbc): 

297 fu.log(classname + ": Incorrect pbc provided, exiting", out_log) 

298 raise SystemExit(classname + ": Incorrect pbc provided") 

299 return pbc 

300 

301 

302def get_center(properties, out_log, classname): 

303 """Gets center""" 

304 center = properties.get("center", get_default_value("center")) 

305 if not is_valid_boolean(center): 

306 fu.log(classname + ": Incorrect center provided, exiting", out_log) 

307 raise SystemExit(classname + ": Incorrect center provided") 

308 return center 

309 

310 

311def get_ur(properties, out_log, classname): 

312 """Gets ur""" 

313 ur = properties.get("ur", get_default_value("ur")) 

314 if not is_valid_ur(ur): 

315 fu.log(classname + ": Incorrect ur provided, exiting", out_log) 

316 raise SystemExit(classname + ": Incorrect ur provided") 

317 return ur 

318 

319 

320def get_fit(properties, out_log, classname): 

321 """Gets fit""" 

322 fit = properties.get("fit", get_default_value("fit")) 

323 if not is_valid_fit(fit): 

324 fu.log(classname + ": Incorrect fit provided, exiting", out_log) 

325 raise SystemExit(classname + ": Incorrect fit provided") 

326 return fit 

327 

328 

329def get_skip(properties, out_log, classname): 

330 """Gets skip""" 

331 skip = properties.get("skip", get_default_value("skip")) 

332 if not is_valid_int(skip): 

333 fu.log(classname + ": Incorrect skip provided, exiting", out_log) 

334 raise SystemExit(classname + ": Incorrect start provided") 

335 return str(skip) 

336 

337 

338def get_start(properties, out_log, classname): 

339 """Gets start""" 

340 start = properties.get("start", get_default_value("start")) 

341 

342 if start is None: 

343 return start 

344 if not is_valid_int(start): 

345 fu.log(classname + ": Incorrect start provided, exiting", out_log) 

346 raise SystemExit(classname + ": Incorrect start provided") 

347 return str(start) 

348 

349 

350def get_end(properties, out_log, classname): 

351 """Gets end""" 

352 end = properties.get("end", get_default_value("end")) 

353 if end is None: 

354 return end 

355 if not is_valid_int(end): 

356 fu.log(classname + ": Incorrect end provided, exiting", out_log) 

357 raise SystemExit(classname + ": Incorrect end provided") 

358 return str(end) 

359 

360 

361def get_dt(properties, out_log, classname): 

362 """Gets dt""" 

363 dt = properties.get("dt", get_default_value("dt")) 

364 if dt is None: 

365 return dt 

366 if not is_valid_int(dt): 

367 fu.log(classname + ": Incorrect dt provided, exiting", out_log) 

368 raise SystemExit(classname + ": Incorrect dt provided") 

369 return str(dt) 

370 

371def get_dump(properties, out_log, classname): 

372 """Gets dump""" 

373 dump = properties.get("dump", get_default_value("dump")) 

374 if dump is None: 

375 return dump 

376 if not is_valid_int(dump): 

377 fu.log(classname + ": Incorrect dump provided, exiting", out_log) 

378 raise SystemExit(classname + ": Incorrect dump provided") 

379 return str(dump) 

380 

381 

382def get_ot_str_ens(properties, out_log, classname): 

383 """Gets output type""" 

384 output_type = properties.get("output_type", get_default_value("ot_str_ens")) 

385 if not is_valid_ot_str_ens(output_type): 

386 fu.log(classname + ": Incorrect output_type provided, exiting", out_log) 

387 raise SystemExit(classname + ": Incorrect output_type provided") 

388 return str(output_type) 

389 

390 

391def get_xvg(properties, out_log, classname): 

392 """Gets xvg""" 

393 xvg = properties.get("xvg", get_default_value("xvg")) 

394 if not is_valid_xvg_param(xvg): 

395 fu.log(classname + ": Incorrect xvg provided, exiting", out_log) 

396 raise SystemExit(classname + ": Incorrect xvg provided") 

397 return xvg 

398 

399 

400def get_dista(properties, out_log, classname): 

401 """Gets dista""" 

402 dista = properties.get("dista", get_default_value("dista")) 

403 if not is_valid_boolean(dista): 

404 fu.log(classname + ": Incorrect dista provided, exiting", out_log) 

405 raise SystemExit(classname + ": Incorrect dista provided") 

406 return dista 

407 

408 

409def get_method(properties, out_log, classname): 

410 """Gets method""" 

411 method = properties.get("method", get_default_value("method")) 

412 if not is_valid_method_param(method): 

413 fu.log(classname + ": Incorrect method provided, exiting", out_log) 

414 raise SystemExit(classname + ": Incorrect method provided") 

415 return method 

416 

417 

418def get_cutoff(properties, out_log, classname): 

419 """Gets cutoff""" 

420 cutoff = properties.get("cutoff", get_default_value("cutoff")) 

421 if not is_valid_float(cutoff): 

422 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log) 

423 raise SystemExit(classname + ": Incorrect cutoff provided") 

424 return str(cutoff) 

425 

426 

427def is_valid_boolean(val): 

428 """Checks if given value is boolean""" 

429 values = [True, False] 

430 return val in values 

431 

432 

433def is_valid_float(val): 

434 """Checks if given value is float""" 

435 if val and not isinstance(val, float) and not isinstance(val, int): 

436 return False 

437 return True 

438 

439 

440def is_valid_int(val): 

441 """Checks if given value is int""" 

442 if val and not isinstance(val, int): 

443 return False 

444 return True 

445 

446 

447def is_valid_method_param(met): 

448 """Checks if method is compatible with GROMACS""" 

449 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"] 

450 return met in methods 

451 

452 

453def is_valid_structure(ext): 

454 """Checks if structure format is compatible with GROMACS""" 

455 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"] 

456 return ext in formats 

457 

458 

459def is_valid_index(ext): 

460 """Checks if structure format is compatible with GROMACS""" 

461 formats = ["ndx"] 

462 return ext in formats 

463 

464 

465def is_valid_trajectory(ext): 

466 """Checks if trajectory format is compatible with GROMACS""" 

467 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"] 

468 return ext in formats 

469 

470 

471def is_valid_trajectory_output(ext): 

472 """Checks if trajectory format is compatible with GROMACS""" 

473 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"] 

474 return ext in formats 

475 

476 

477def is_valid_energy(ext): 

478 """Checks if energy format is compatible with GROMACS""" 

479 formats = ["edr"] 

480 return ext in formats 

481 

482 

483def is_valid_xvg(ext): 

484 """Checks if file is XVG""" 

485 formats = ["xvg"] 

486 return ext in formats 

487 

488 

489def is_valid_zip(ext): 

490 """Checks if file is ZIP""" 

491 formats = ["zip"] 

492 return ext in formats 

493 

494 

495def is_valid_xvg_param(ext): 

496 """Checks xvg parameter""" 

497 formats = ["xmgrace", "xmgr", "none"] 

498 return ext in formats 

499 

500 

501def is_valid_ot_str_ens(ext): 

502 """Checks if output type for structure ensemble is correct""" 

503 formats = ["gro", "g96", "pdb"] 

504 return ext in formats 

505 

506 

507def is_valid_pbc(pbc): 

508 """Checks pbc parameter""" 

509 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"] 

510 return pbc in values 

511 

512 

513def is_valid_ur(ur): 

514 """Checks ur parameter""" 

515 values = ["rect", "tric", "compact"] 

516 return ur in values 

517 

518 

519def is_valid_fit(fit): 

520 """Checks fit parameter""" 

521 values = [ 

522 "none", 

523 "rot+trans", 

524 "rotxy+transxy", 

525 "translation", 

526 "transxy", 

527 "progressive", 

528 ] 

529 return fit in values 

530 

531 

532def is_valid_term(iterms): 

533 """Checks if term is correct""" 

534 cterms = [ 

535 "Angle", 

536 "Proper-Dih.", 

537 "Improper-Dih.", 

538 "LJ-14", 

539 "Coulomb-14", 

540 "LJ-(SR)", 

541 "Coulomb-(SR)", 

542 "Coul.-recip.", 

543 "Position-Rest.", 

544 "Potential", 

545 "Kinetic-En.", 

546 "Total-Energy", 

547 "Temperature", 

548 "Pressure", 

549 " Constr.-rmsd", 

550 "Box-X", 

551 "Box-Y", 

552 " Box-Z", 

553 "Volume", 

554 "Density", 

555 "pV", 

556 "Enthalpy", 

557 "Vir-XX", 

558 "Vir-XY", 

559 "Vir-XZ", 

560 "Vir-YX", 

561 "Vir-YY", 

562 "Vir-YZ", 

563 "Vir-ZX", 

564 "Vir-ZY", 

565 "Vir-ZZ", 

566 "Pres-XX", 

567 "Pres-XY", 

568 "Pres-XZ", 

569 "Pres-YX", 

570 "Pres-YY", 

571 "Pres-YZ", 

572 "Pres-ZX", 

573 "Pres-ZY", 

574 "Pres-ZZ", 

575 "#Surf*SurfTen", 

576 "Box-Vel-XX", 

577 "Box-Vel-YY", 

578 "Box-Vel-ZZ", 

579 "Mu-X", 

580 "Mu-Y", 

581 "Mu-Z", 

582 "T-Protein", 

583 "T-non-Protein", 

584 "Lamb-Protein", 

585 "Lamb-non-Protein", 

586 ] 

587 return all(elem in cterms for elem in iterms) 

588 

589 

590def is_valid_selection(ext): 

591 """Checks if selection is correct""" 

592 formats = [ 

593 "System", 

594 "Protein", 

595 "Protein-H", 

596 "C-alpha", 

597 "Backbone", 

598 "MainChain", 

599 "MainChain+Cb", 

600 "MainChain+H", 

601 "SideChain", 

602 "SideChain-H", 

603 "Prot-Masses", 

604 "non-Protein", 

605 "Water", 

606 "SOL", 

607 "non-Water", 

608 "Ion", 

609 "NA", 

610 "CL", 

611 "Water_and_ions", 

612 "DNA", 

613 "RNA", 

614 "Protein_DNA", 

615 "Protein_RNA", 

616 "Protein_DNA_RNA", 

617 "DNA_RNA", 

618 "DPPC", 

619 "DMPC", 

620 "POPG", 

621 "POPA", 

622 "POPC", 

623 "POPE", 

624 "DMTAP", 

625 "POPS" 

626 ] 

627 return ext in formats 

628 

629 

630def copy_instructions_file_to_container(instructions_file, unique_dir): 

631 shutil.copy2(instructions_file, unique_dir) 

632 

633 

634def remove_tmp_files(list, remove_tmp, out_log): 

635 """Removes temporal files generated by the wrapper""" 

636 if remove_tmp: 

637 tmp_files = list 

638 removed_files = [f for f in tmp_files if fu.rm(f)] 

639 fu.log("Removed: %s" % str(removed_files), out_log) 

640 

641 

642def process_output_trjconv_str_ens( 

643 tmp_folder, output_file, output_dir, glob_pattern, out_log 

644): 

645 tmp_fl = list(Path(tmp_folder).glob(glob_pattern)) 

646 if not tmp_fl: 

647 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb")) 

648 

649 files_list = [] 

650 for file_name in tmp_fl: 

651 files_list.append(file_name) 

652 

653 # adding files from temporary folder to zip 

654 fu.zip_list(output_file, files_list, out_log) 

655 

656 shutil.copy2(output_file, output_dir) 

657 

658 

659def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]: 

660 """ 

661 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is. 

662 Returns an empty list if input_data is None. 

663 

664 Parameters: 

665 input_data (str, list, or None): The string, list, or None value to convert. 

666 

667 Returns: 

668 list: A list of string elements or an empty list if input_data is None. 

669 """ 

670 if input_data is None: 

671 return [] 

672 

673 if isinstance(input_data, list): 

674 # If input is already a list, return it 

675 return input_data 

676 

677 # If input is a string, determine the delimiter based on presence of commas 

678 delimiter = "," if "," in input_data else " " 

679 items = input_data.split(delimiter) 

680 

681 # Remove whitespace from each item and ignore empty strings 

682 processed_items = [item.strip() for item in items if item.strip()] 

683 

684 return processed_items