Coverage for biobb_analysis/gromacs/common.py: 60%

326 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-14 09:14 +0000

1"""Common functions for package biobb_analysis.gromacs""" 

2 

3import re 

4import shutil 

5from pathlib import Path, PurePath 

6from typing import Optional, Union 

7 

8from biobb_common.command_wrapper import cmd_wrapper 

9from biobb_common.tools import file_utils as fu 

10 

11 

12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool: 

13 print("Comparing GROMACS files:") 

14 print("FILE_A: %s" % str(Path(file_a).resolve())) 

15 print("FILE_B: %s" % str(Path(file_b).resolve())) 

16 check_result = "check_result.out" 

17 cmd = [gmx, "check"] 

18 if file_a.endswith(".tpr"): 

19 cmd.append("-s1") 

20 else: 

21 cmd.append("-f") 

22 cmd.append(file_a) 

23 if file_b.endswith(".tpr"): 

24 cmd.append("-s2") 

25 else: 

26 cmd.append("-f2") 

27 cmd.append(file_b) 

28 cmd.append("> check_result.out") 

29 cmd_wrapper.CmdWrapper(cmd).launch() 

30 print("Result file: %s" % str(Path(check_result).resolve())) 

31 with open(check_result) as check_file: 

32 for line_num, line in enumerate(check_file): 

33 if not line.rstrip(): 

34 continue 

35 if line.startswith("Both files read correctly"): 

36 continue 

37 if not line.startswith("comparing"): 

38 print("Discrepance found in line %d: %s" % (line_num, line)) 

39 return False 

40 return True 

41 

42 

43def check_energy_path(path, out_log, classname): 

44 """Checks energy input file""" 

45 if not Path(path).exists(): 

46 fu.log(classname + ": Unexisting energy input file, exiting", out_log) 

47 raise SystemExit(classname + ": Unexisting energy input file") 

48 file_extension = PurePath(path).suffix 

49 if not is_valid_energy(file_extension[1:]): 

50 fu.log( 

51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:], 

52 out_log, 

53 ) 

54 raise SystemExit( 

55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:] 

56 ) 

57 # if file input has no path, add cwd because execution is launched on tmp folder 

58 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

59 path = str(PurePath(Path.cwd()).joinpath(path)) 

60 return path 

61 

62 

63def check_input_path(path, out_log, classname): 

64 """Checks input structure file""" 

65 if not Path(path).exists(): 

66 fu.log(classname + ": Unexisting structure input file, exiting", out_log) 

67 raise SystemExit(classname + ": Unexisting structure input file") 

68 file_extension = PurePath(path).suffix 

69 if not is_valid_structure(file_extension[1:]): 

70 fu.log( 

71 classname + ": Format %s in structure input file is not compatible" 

72 % file_extension[1:], 

73 out_log, 

74 ) 

75 raise SystemExit( 

76 classname + ": Format %s in structure input file is not compatible" 

77 % file_extension[1:] 

78 ) 

79 # if file input has no path, add cwd because execution is launched on tmp folder 

80 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

81 path = str(PurePath(Path.cwd()).joinpath(path)) 

82 return path 

83 

84 

85def check_index_path(path, out_log, classname): 

86 """Checks index input file""" 

87 if not path: 

88 return None 

89 file_extension = PurePath(path).suffix 

90 if not is_valid_index(file_extension[1:]): 

91 fu.log( 

92 classname + ": Format %s in index input file is not compatible" % file_extension[1:], 

93 out_log, 

94 ) 

95 raise SystemExit( 

96 classname + ": Format %s in index input file is not compatible" % file_extension[1:] 

97 ) 

98 # if file input has no path, add cwd because execution is launched on tmp folder 

99 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

100 path = str(PurePath(Path.cwd()).joinpath(path)) 

101 return path 

102 

103 

104def check_traj_path(path, out_log, classname): 

105 """Checks input structure file""" 

106 if not Path(path).exists(): 

107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log) 

108 raise SystemExit(classname + ": Unexisting trajectory input file") 

109 file_extension = PurePath(path).suffix 

110 if not is_valid_trajectory(file_extension[1:]): 

111 fu.log( 

112 classname + ": Format %s in trajectory input file is not compatible" 

113 % file_extension[1:], 

114 out_log, 

115 ) 

116 raise SystemExit( 

117 classname + ": Format %s in trajectory input file is not compatible" 

118 % file_extension[1:] 

119 ) 

120 # if file input has no path, add cwd because execution is launched on tmp folder 

121 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

122 path = str(PurePath(Path.cwd()).joinpath(path)) 

123 return path 

124 

125 

126def check_out_xvg_path(path, out_log, classname): 

127 """Checks if output folder exists and format is xvg""" 

128 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

129 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

130 raise SystemExit(classname + ": Unexisting output folder") 

131 file_extension = PurePath(path).suffix 

132 if not is_valid_xvg(file_extension[1:]): 

133 fu.log( 

134 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

135 out_log, 

136 ) 

137 raise SystemExit( 

138 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

139 ) 

140 return path 

141 

142 

143def check_out_log_path(path, out_log, classname): 

144 """Checks if output folder exists for log-like files""" 

145 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

146 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

147 raise SystemExit(classname + ": Unexisting output folder") 

148 return path 

149 

150 

151def check_out_pdb_path(path, out_log, classname): 

152 """Checks if output folder exists and format is xvg""" 

153 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

154 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

155 raise SystemExit(classname + ": Unexisting output folder") 

156 file_extension = PurePath(path).suffix 

157 if not is_valid_structure(file_extension[1:]): 

158 fu.log( 

159 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

160 out_log, 

161 ) 

162 raise SystemExit( 

163 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

164 ) 

165 return path 

166 

167 

168def check_out_traj_path(path, out_log, classname): 

169 """Checks if output folder exists and format is correct""" 

170 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

171 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

172 raise SystemExit(classname + ": Unexisting output folder") 

173 file_extension = PurePath(path).suffix 

174 if not is_valid_trajectory_output(file_extension[1:]): 

175 fu.log( 

176 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

177 out_log, 

178 ) 

179 raise SystemExit( 

180 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

181 ) 

182 return path 

183 

184 

185def check_out_str_ens_path(path, out_log, classname): 

186 """Checks if output folder exists and format is correct""" 

187 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

188 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

189 raise SystemExit(classname + ": Unexisting output folder") 

190 file_extension = PurePath(path).suffix 

191 if not is_valid_zip(file_extension[1:]): 

192 fu.log( 

193 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

194 out_log, 

195 ) 

196 raise SystemExit( 

197 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

198 ) 

199 return path 

200 

201 

202def get_default_value(key): 

203 """Gives default values according to the given key""" 

204 

205 default_values = { 

206 "instructions_file": "instructions.in", 

207 "binary_path": "gmx", 

208 "terms": ["Potential"], 

209 "selection": "System", 

210 "xvg": "none", 

211 "dista": False, 

212 "method": "linkage", 

213 "cutoff": 0.1, 

214 "cluster_selection": "System", 

215 "fit_selection": "System", 

216 "center_selection": "System", 

217 "output_selection": "System", 

218 "pbc": "mol", 

219 "center": True, 

220 "fit": "none", 

221 "ur": "compact", 

222 "skip": 1, 

223 "start": None, 

224 "end": None, 

225 "dt": None, 

226 "ot_str_ens": "pdb", 

227 } 

228 

229 return default_values[key] 

230 

231 

232def get_binary_path(properties, type): 

233 """Gets binary path""" 

234 return properties.get(type, get_default_value(type)) 

235 

236 

237def get_terms(properties, out_log, classname): 

238 """Gets energy terms""" 

239 terms = properties.get("terms", dict()) 

240 if not terms or not isinstance(terms, list): 

241 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log) 

242 raise SystemExit(classname + ": No terms provided or incorrect format") 

243 if not is_valid_term(terms): 

244 fu.log(classname + ": Incorrect terms provided, exiting", out_log) 

245 raise SystemExit(classname + ": Incorrect terms provided") 

246 return properties.get("terms", "") 

247 

248 

249def get_selection(properties, out_log, classname): 

250 """Gets selection items""" 

251 selection = properties.get("selection", get_default_value("selection")) 

252 if not selection: 

253 fu.log( 

254 classname + ": No selection provided or incorrect format, exiting", out_log 

255 ) 

256 raise SystemExit(classname + ": No selection provided or incorrect format") 

257 if not is_valid_selection(selection): 

258 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

259 raise SystemExit(classname + ": Incorrect selection provided") 

260 return selection 

261 

262 

263def get_image_selection(properties, key, out_log, classname): 

264 """Gets selection items""" 

265 selection = properties.get(key, get_default_value(key)) 

266 if not selection: 

267 fu.log( 

268 classname + ": No selection provided or incorrect format, exiting", out_log 

269 ) 

270 raise SystemExit(classname + ": No selection provided or incorrect format") 

271 if not is_valid_selection(selection): 

272 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

273 raise SystemExit(classname + ": Incorrect selection provided") 

274 return selection 

275 

276 

277def get_selection_index_file(properties, index, key, out_log, classname): 

278 """Gets selection items from provided index file""" 

279 pattern = re.compile(r"\[.*\]") 

280 selection = [] 

281 with open(index, "r") as ndx_file: 

282 for i, line in enumerate(ndx_file): 

283 for match in re.finditer(pattern, line): 

284 selection.append(re.sub(r"[\[\] ]", "", match.group())) 

285 sel = properties.get(key, get_default_value(key)) 

286 if sel not in selection: 

287 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

288 raise SystemExit(classname + ": Incorrect selection provided") 

289 return sel 

290 

291 

292def get_pbc(properties, out_log, classname): 

293 """Gets pbc""" 

294 pbc = properties.get("pbc", get_default_value("pbc")) 

295 if not is_valid_pbc(pbc): 

296 fu.log(classname + ": Incorrect pbc provided, exiting", out_log) 

297 raise SystemExit(classname + ": Incorrect pbc provided") 

298 return pbc 

299 

300 

301def get_center(properties, out_log, classname): 

302 """Gets center""" 

303 center = properties.get("center", get_default_value("center")) 

304 if not is_valid_boolean(center): 

305 fu.log(classname + ": Incorrect center provided, exiting", out_log) 

306 raise SystemExit(classname + ": Incorrect center provided") 

307 return center 

308 

309 

310def get_ur(properties, out_log, classname): 

311 """Gets ur""" 

312 ur = properties.get("ur", get_default_value("ur")) 

313 if not is_valid_ur(ur): 

314 fu.log(classname + ": Incorrect ur provided, exiting", out_log) 

315 raise SystemExit(classname + ": Incorrect ur provided") 

316 return ur 

317 

318 

319def get_fit(properties, out_log, classname): 

320 """Gets fit""" 

321 fit = properties.get("fit", get_default_value("fit")) 

322 if not is_valid_fit(fit): 

323 fu.log(classname + ": Incorrect fit provided, exiting", out_log) 

324 raise SystemExit(classname + ": Incorrect fit provided") 

325 return fit 

326 

327 

328def get_skip(properties, out_log, classname): 

329 """Gets skip""" 

330 skip = properties.get("skip", get_default_value("skip")) 

331 if not is_valid_int(skip): 

332 fu.log(classname + ": Incorrect skip provided, exiting", out_log) 

333 raise SystemExit(classname + ": Incorrect start provided") 

334 return str(skip) 

335 

336 

337def get_start(properties, out_log, classname): 

338 """Gets start""" 

339 start = properties.get("start", get_default_value("start")) 

340 

341 if start is None: 

342 return start 

343 if not is_valid_int(start): 

344 fu.log(classname + ": Incorrect start provided, exiting", out_log) 

345 raise SystemExit(classname + ": Incorrect start provided") 

346 return str(start) 

347 

348 

349def get_end(properties, out_log, classname): 

350 """Gets end""" 

351 end = properties.get("end", get_default_value("end")) 

352 if end is None: 

353 return end 

354 if not is_valid_int(end): 

355 fu.log(classname + ": Incorrect end provided, exiting", out_log) 

356 raise SystemExit(classname + ": Incorrect end provided") 

357 return str(end) 

358 

359 

360def get_dt(properties, out_log, classname): 

361 """Gets dt""" 

362 dt = properties.get("dt", get_default_value("dt")) 

363 if dt is None: 

364 return dt 

365 if not is_valid_int(dt): 

366 fu.log(classname + ": Incorrect dt provided, exiting", out_log) 

367 raise SystemExit(classname + ": Incorrect dt provided") 

368 return str(dt) 

369 

370 

371def get_ot_str_ens(properties, out_log, classname): 

372 """Gets output type""" 

373 output_type = properties.get("output_type", get_default_value("ot_str_ens")) 

374 if not is_valid_ot_str_ens(output_type): 

375 fu.log(classname + ": Incorrect output_type provided, exiting", out_log) 

376 raise SystemExit(classname + ": Incorrect output_type provided") 

377 return str(output_type) 

378 

379 

380def get_xvg(properties, out_log, classname): 

381 """Gets xvg""" 

382 xvg = properties.get("xvg", get_default_value("xvg")) 

383 if not is_valid_xvg_param(xvg): 

384 fu.log(classname + ": Incorrect xvg provided, exiting", out_log) 

385 raise SystemExit(classname + ": Incorrect xvg provided") 

386 return xvg 

387 

388 

389def get_dista(properties, out_log, classname): 

390 """Gets dista""" 

391 dista = properties.get("dista", get_default_value("dista")) 

392 if not is_valid_boolean(dista): 

393 fu.log(classname + ": Incorrect dista provided, exiting", out_log) 

394 raise SystemExit(classname + ": Incorrect dista provided") 

395 return dista 

396 

397 

398def get_method(properties, out_log, classname): 

399 """Gets method""" 

400 method = properties.get("method", get_default_value("method")) 

401 if not is_valid_method_param(method): 

402 fu.log(classname + ": Incorrect method provided, exiting", out_log) 

403 raise SystemExit(classname + ": Incorrect method provided") 

404 return method 

405 

406 

407def get_cutoff(properties, out_log, classname): 

408 """Gets cutoff""" 

409 cutoff = properties.get("cutoff", get_default_value("cutoff")) 

410 if not is_valid_float(cutoff): 

411 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log) 

412 raise SystemExit(classname + ": Incorrect cutoff provided") 

413 return str(cutoff) 

414 

415 

416def is_valid_boolean(val): 

417 """Checks if given value is boolean""" 

418 values = [True, False] 

419 return val in values 

420 

421 

422def is_valid_float(val): 

423 """Checks if given value is float""" 

424 if val and not isinstance(val, float) and not isinstance(val, int): 

425 return False 

426 return True 

427 

428 

429def is_valid_int(val): 

430 """Checks if given value is int""" 

431 if val and not isinstance(val, int): 

432 return False 

433 return True 

434 

435 

436def is_valid_method_param(met): 

437 """Checks if method is compatible with GROMACS""" 

438 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"] 

439 return met in methods 

440 

441 

442def is_valid_structure(ext): 

443 """Checks if structure format is compatible with GROMACS""" 

444 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"] 

445 return ext in formats 

446 

447 

448def is_valid_index(ext): 

449 """Checks if structure format is compatible with GROMACS""" 

450 formats = ["ndx"] 

451 return ext in formats 

452 

453 

454def is_valid_trajectory(ext): 

455 """Checks if trajectory format is compatible with GROMACS""" 

456 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"] 

457 return ext in formats 

458 

459 

460def is_valid_trajectory_output(ext): 

461 """Checks if trajectory format is compatible with GROMACS""" 

462 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"] 

463 return ext in formats 

464 

465 

466def is_valid_energy(ext): 

467 """Checks if energy format is compatible with GROMACS""" 

468 formats = ["edr"] 

469 return ext in formats 

470 

471 

472def is_valid_xvg(ext): 

473 """Checks if file is XVG""" 

474 formats = ["xvg"] 

475 return ext in formats 

476 

477 

478def is_valid_zip(ext): 

479 """Checks if file is ZIP""" 

480 formats = ["zip"] 

481 return ext in formats 

482 

483 

484def is_valid_xvg_param(ext): 

485 """Checks xvg parameter""" 

486 formats = ["xmgrace", "xmgr", "none"] 

487 return ext in formats 

488 

489 

490def is_valid_ot_str_ens(ext): 

491 """Checks if output type for structure ensemble is correct""" 

492 formats = ["gro", "g96", "pdb"] 

493 return ext in formats 

494 

495 

496def is_valid_pbc(pbc): 

497 """Checks pbc parameter""" 

498 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"] 

499 return pbc in values 

500 

501 

502def is_valid_ur(ur): 

503 """Checks ur parameter""" 

504 values = ["rect", "tric", "compact"] 

505 return ur in values 

506 

507 

508def is_valid_fit(fit): 

509 """Checks fit parameter""" 

510 values = [ 

511 "none", 

512 "rot+trans", 

513 "rotxy+transxy", 

514 "translation", 

515 "transxy", 

516 "progressive", 

517 ] 

518 return fit in values 

519 

520 

521def is_valid_term(iterms): 

522 """Checks if term is correct""" 

523 cterms = [ 

524 "Angle", 

525 "Proper-Dih.", 

526 "Improper-Dih.", 

527 "LJ-14", 

528 "Coulomb-14", 

529 "LJ-(SR)", 

530 "Coulomb-(SR)", 

531 "Coul.-recip.", 

532 "Position-Rest.", 

533 "Potential", 

534 "Kinetic-En.", 

535 "Total-Energy", 

536 "Temperature", 

537 "Pressure", 

538 " Constr.-rmsd", 

539 "Box-X", 

540 "Box-Y", 

541 " Box-Z", 

542 "Volume", 

543 "Density", 

544 "pV", 

545 "Enthalpy", 

546 "Vir-XX", 

547 "Vir-XY", 

548 "Vir-XZ", 

549 "Vir-YX", 

550 "Vir-YY", 

551 "Vir-YZ", 

552 "Vir-ZX", 

553 "Vir-ZY", 

554 "Vir-ZZ", 

555 "Pres-XX", 

556 "Pres-XY", 

557 "Pres-XZ", 

558 "Pres-YX", 

559 "Pres-YY", 

560 "Pres-YZ", 

561 "Pres-ZX", 

562 "Pres-ZY", 

563 "Pres-ZZ", 

564 "#Surf*SurfTen", 

565 "Box-Vel-XX", 

566 "Box-Vel-YY", 

567 "Box-Vel-ZZ", 

568 "Mu-X", 

569 "Mu-Y", 

570 "Mu-Z", 

571 "T-Protein", 

572 "T-non-Protein", 

573 "Lamb-Protein", 

574 "Lamb-non-Protein", 

575 ] 

576 return all(elem in cterms for elem in iterms) 

577 

578 

579def is_valid_selection(ext): 

580 """Checks if selection is correct""" 

581 formats = [ 

582 "System", 

583 "Protein", 

584 "Protein-H", 

585 "C-alpha", 

586 "Backbone", 

587 "MainChain", 

588 "MainChain+Cb", 

589 "MainChain+H", 

590 "SideChain", 

591 "SideChain-H", 

592 "Prot-Masses", 

593 "non-Protein", 

594 "Water", 

595 "SOL", 

596 "non-Water", 

597 "Ion", 

598 "NA", 

599 "CL", 

600 "Water_and_ions", 

601 "DNA", 

602 "RNA", 

603 "Protein_DNA", 

604 "Protein_RNA", 

605 "Protein_DNA_RNA", 

606 "DNA_RNA", 

607 "DPPC", 

608 "DMPC", 

609 "POPG", 

610 "POPA", 

611 "POPC", 

612 "POPE", 

613 "DMTAP", 

614 "POPS" 

615 ] 

616 return ext in formats 

617 

618 

619def copy_instructions_file_to_container(instructions_file, unique_dir): 

620 shutil.copy2(instructions_file, unique_dir) 

621 

622 

623def remove_tmp_files(list, remove_tmp, out_log): 

624 """Removes temporal files generated by the wrapper""" 

625 if remove_tmp: 

626 tmp_files = list 

627 removed_files = [f for f in tmp_files if fu.rm(f)] 

628 fu.log("Removed: %s" % str(removed_files), out_log) 

629 

630 

631def process_output_trjconv_str_ens( 

632 tmp_folder, output_file, output_dir, glob_pattern, out_log 

633): 

634 tmp_fl = list(Path(tmp_folder).glob(glob_pattern)) 

635 if not tmp_fl: 

636 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb")) 

637 

638 files_list = [] 

639 for file_name in tmp_fl: 

640 files_list.append(file_name) 

641 

642 # adding files from temporary folder to zip 

643 fu.zip_list(output_file, files_list, out_log) 

644 

645 shutil.copy2(output_file, output_dir) 

646 

647 

648def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]: 

649 """ 

650 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is. 

651 Returns an empty list if input_data is None. 

652 

653 Parameters: 

654 input_data (str, list, or None): The string, list, or None value to convert. 

655 

656 Returns: 

657 list: A list of string elements or an empty list if input_data is None. 

658 """ 

659 if input_data is None: 

660 return [] 

661 

662 if isinstance(input_data, list): 

663 # If input is already a list, return it 

664 return input_data 

665 

666 # If input is a string, determine the delimiter based on presence of commas 

667 delimiter = "," if "," in input_data else " " 

668 items = input_data.split(delimiter) 

669 

670 # Remove whitespace from each item and ignore empty strings 

671 processed_items = [item.strip() for item in items if item.strip()] 

672 

673 return processed_items