Coverage for biobb_analysis/gromacs/common.py: 60%

321 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-08 08:07 +0000

1"""Common functions for package biobb_analysis.gromacs""" 

2 

3import re 

4import shutil 

5from pathlib import Path, PurePath 

6from typing import Optional, Union 

7 

8from biobb_common.command_wrapper import cmd_wrapper 

9from biobb_common.tools import file_utils as fu 

10 

11 

12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool: 

13 print("Comparing GROMACS files:") 

14 print("FILE_A: %s" % str(Path(file_a).resolve())) 

15 print("FILE_B: %s" % str(Path(file_b).resolve())) 

16 check_result = "check_result.out" 

17 cmd = [gmx, "check"] 

18 if file_a.endswith(".tpr"): 

19 cmd.append("-s1") 

20 else: 

21 cmd.append("-f") 

22 cmd.append(file_a) 

23 if file_b.endswith(".tpr"): 

24 cmd.append("-s2") 

25 else: 

26 cmd.append("-f2") 

27 cmd.append(file_b) 

28 cmd.append("> check_result.out") 

29 cmd_wrapper.CmdWrapper(cmd).launch() 

30 print("Result file: %s" % str(Path(check_result).resolve())) 

31 with open(check_result) as check_file: 

32 for line_num, line in enumerate(check_file): 

33 if not line.rstrip(): 

34 continue 

35 if line.startswith("Both files read correctly"): 

36 continue 

37 if not line.startswith("comparing"): 

38 print("Discrepance found in line %d: %s" % (line_num, line)) 

39 return False 

40 return True 

41 

42 

43def check_energy_path(path, out_log, classname): 

44 """Checks energy input file""" 

45 if not Path(path).exists(): 

46 fu.log(classname + ": Unexisting energy input file, exiting", out_log) 

47 raise SystemExit(classname + ": Unexisting energy input file") 

48 file_extension = PurePath(path).suffix 

49 if not is_valid_energy(file_extension[1:]): 

50 fu.log( 

51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:], 

52 out_log, 

53 ) 

54 raise SystemExit( 

55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:] 

56 ) 

57 # if file input has no path, add cwd because execution is launched on tmp folder 

58 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

59 path = str(PurePath(Path.cwd()).joinpath(path)) 

60 return path 

61 

62 

63def check_input_path(path, out_log, classname): 

64 """Checks input structure file""" 

65 if not Path(path).exists(): 

66 fu.log(classname + ": Unexisting structure input file, exiting", out_log) 

67 raise SystemExit(classname + ": Unexisting structure input file") 

68 file_extension = PurePath(path).suffix 

69 if not is_valid_structure(file_extension[1:]): 

70 fu.log( 

71 classname + ": Format %s in structure input file is not compatible" 

72 % file_extension[1:], 

73 out_log, 

74 ) 

75 raise SystemExit( 

76 classname + ": Format %s in structure input file is not compatible" 

77 % file_extension[1:] 

78 ) 

79 # if file input has no path, add cwd because execution is launched on tmp folder 

80 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

81 path = str(PurePath(Path.cwd()).joinpath(path)) 

82 return path 

83 

84 

85def check_index_path(path, out_log, classname): 

86 """Checks index input file""" 

87 if not path: 

88 return None 

89 file_extension = PurePath(path).suffix 

90 if not is_valid_index(file_extension[1:]): 

91 fu.log( 

92 classname + ": Format %s in index input file is not compatible" % file_extension[1:], 

93 out_log, 

94 ) 

95 raise SystemExit( 

96 classname + ": Format %s in index input file is not compatible" % file_extension[1:] 

97 ) 

98 # if file input has no path, add cwd because execution is launched on tmp folder 

99 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

100 path = str(PurePath(Path.cwd()).joinpath(path)) 

101 return path 

102 

103 

104def check_traj_path(path, out_log, classname): 

105 """Checks input structure file""" 

106 if not Path(path).exists(): 

107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log) 

108 raise SystemExit(classname + ": Unexisting trajectory input file") 

109 file_extension = PurePath(path).suffix 

110 if not is_valid_trajectory(file_extension[1:]): 

111 fu.log( 

112 classname + ": Format %s in trajectory input file is not compatible" 

113 % file_extension[1:], 

114 out_log, 

115 ) 

116 raise SystemExit( 

117 classname + ": Format %s in trajectory input file is not compatible" 

118 % file_extension[1:] 

119 ) 

120 # if file input has no path, add cwd because execution is launched on tmp folder 

121 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

122 path = str(PurePath(Path.cwd()).joinpath(path)) 

123 return path 

124 

125 

126def check_out_xvg_path(path, out_log, classname): 

127 """Checks if output folder exists and format is xvg""" 

128 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

129 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

130 raise SystemExit(classname + ": Unexisting output folder") 

131 file_extension = PurePath(path).suffix 

132 if not is_valid_xvg(file_extension[1:]): 

133 fu.log( 

134 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

135 out_log, 

136 ) 

137 raise SystemExit( 

138 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

139 ) 

140 return path 

141 

142 

143def check_out_pdb_path(path, out_log, classname): 

144 """Checks if output folder exists and format is xvg""" 

145 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

146 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

147 raise SystemExit(classname + ": Unexisting output folder") 

148 file_extension = PurePath(path).suffix 

149 if not is_valid_structure(file_extension[1:]): 

150 fu.log( 

151 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

152 out_log, 

153 ) 

154 raise SystemExit( 

155 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

156 ) 

157 return path 

158 

159 

160def check_out_traj_path(path, out_log, classname): 

161 """Checks if output folder exists and format is correct""" 

162 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

163 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

164 raise SystemExit(classname + ": Unexisting output folder") 

165 file_extension = PurePath(path).suffix 

166 if not is_valid_trajectory_output(file_extension[1:]): 

167 fu.log( 

168 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

169 out_log, 

170 ) 

171 raise SystemExit( 

172 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

173 ) 

174 return path 

175 

176 

177def check_out_str_ens_path(path, out_log, classname): 

178 """Checks if output folder exists and format is correct""" 

179 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

180 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

181 raise SystemExit(classname + ": Unexisting output folder") 

182 file_extension = PurePath(path).suffix 

183 if not is_valid_zip(file_extension[1:]): 

184 fu.log( 

185 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

186 out_log, 

187 ) 

188 raise SystemExit( 

189 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

190 ) 

191 return path 

192 

193 

194def get_default_value(key): 

195 """Gives default values according to the given key""" 

196 

197 default_values = { 

198 "instructions_file": "instructions.in", 

199 "binary_path": "gmx", 

200 "terms": ["Potential"], 

201 "selection": "System", 

202 "xvg": "none", 

203 "dista": False, 

204 "method": "linkage", 

205 "cutoff": 0.1, 

206 "cluster_selection": "System", 

207 "fit_selection": "System", 

208 "center_selection": "System", 

209 "output_selection": "System", 

210 "pbc": "mol", 

211 "center": True, 

212 "fit": "none", 

213 "ur": "compact", 

214 "skip": 1, 

215 "start": None, 

216 "end": None, 

217 "dt": None, 

218 "ot_str_ens": "pdb", 

219 } 

220 

221 return default_values[key] 

222 

223 

224def get_binary_path(properties, type): 

225 """Gets binary path""" 

226 return properties.get(type, get_default_value(type)) 

227 

228 

229def get_terms(properties, out_log, classname): 

230 """Gets energy terms""" 

231 terms = properties.get("terms", dict()) 

232 if not terms or not isinstance(terms, list): 

233 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log) 

234 raise SystemExit(classname + ": No terms provided or incorrect format") 

235 if not is_valid_term(terms): 

236 fu.log(classname + ": Incorrect terms provided, exiting", out_log) 

237 raise SystemExit(classname + ": Incorrect terms provided") 

238 return properties.get("terms", "") 

239 

240 

241def get_selection(properties, out_log, classname): 

242 """Gets selection items""" 

243 selection = properties.get("selection", get_default_value("selection")) 

244 if not selection: 

245 fu.log( 

246 classname + ": No selection provided or incorrect format, exiting", out_log 

247 ) 

248 raise SystemExit(classname + ": No selection provided or incorrect format") 

249 if not is_valid_selection(selection): 

250 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

251 raise SystemExit(classname + ": Incorrect selection provided") 

252 return selection 

253 

254 

255def get_image_selection(properties, key, out_log, classname): 

256 """Gets selection items""" 

257 selection = properties.get(key, get_default_value(key)) 

258 if not selection: 

259 fu.log( 

260 classname + ": No selection provided or incorrect format, exiting", out_log 

261 ) 

262 raise SystemExit(classname + ": No selection provided or incorrect format") 

263 if not is_valid_selection(selection): 

264 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

265 raise SystemExit(classname + ": Incorrect selection provided") 

266 return selection 

267 

268 

269def get_selection_index_file(properties, index, key, out_log, classname): 

270 """Gets selection items from provided index file""" 

271 pattern = re.compile(r"\[.*\]") 

272 selection = [] 

273 with open(index, "r") as ndx_file: 

274 for i, line in enumerate(ndx_file): 

275 for match in re.finditer(pattern, line): 

276 selection.append(re.sub(r"[\[\] ]", "", match.group())) 

277 sel = properties.get(key, get_default_value(key)) 

278 if sel not in selection: 

279 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

280 raise SystemExit(classname + ": Incorrect selection provided") 

281 return sel 

282 

283 

284def get_pbc(properties, out_log, classname): 

285 """Gets pbc""" 

286 pbc = properties.get("pbc", get_default_value("pbc")) 

287 if not is_valid_pbc(pbc): 

288 fu.log(classname + ": Incorrect pbc provided, exiting", out_log) 

289 raise SystemExit(classname + ": Incorrect pbc provided") 

290 return pbc 

291 

292 

293def get_center(properties, out_log, classname): 

294 """Gets center""" 

295 center = properties.get("center", get_default_value("center")) 

296 if not is_valid_boolean(center): 

297 fu.log(classname + ": Incorrect center provided, exiting", out_log) 

298 raise SystemExit(classname + ": Incorrect center provided") 

299 return center 

300 

301 

302def get_ur(properties, out_log, classname): 

303 """Gets ur""" 

304 ur = properties.get("ur", get_default_value("ur")) 

305 if not is_valid_ur(ur): 

306 fu.log(classname + ": Incorrect ur provided, exiting", out_log) 

307 raise SystemExit(classname + ": Incorrect ur provided") 

308 return ur 

309 

310 

311def get_fit(properties, out_log, classname): 

312 """Gets fit""" 

313 fit = properties.get("fit", get_default_value("fit")) 

314 if not is_valid_fit(fit): 

315 fu.log(classname + ": Incorrect fit provided, exiting", out_log) 

316 raise SystemExit(classname + ": Incorrect fit provided") 

317 return fit 

318 

319 

320def get_skip(properties, out_log, classname): 

321 """Gets skip""" 

322 skip = properties.get("skip", get_default_value("skip")) 

323 if not is_valid_int(skip): 

324 fu.log(classname + ": Incorrect skip provided, exiting", out_log) 

325 raise SystemExit(classname + ": Incorrect start provided") 

326 return str(skip) 

327 

328 

329def get_start(properties, out_log, classname): 

330 """Gets start""" 

331 start = properties.get("start", get_default_value("start")) 

332 

333 if start is None: 

334 return start 

335 if not is_valid_int(start): 

336 fu.log(classname + ": Incorrect start provided, exiting", out_log) 

337 raise SystemExit(classname + ": Incorrect start provided") 

338 return str(start) 

339 

340 

341def get_end(properties, out_log, classname): 

342 """Gets end""" 

343 end = properties.get("end", get_default_value("end")) 

344 if end is None: 

345 return end 

346 if not is_valid_int(end): 

347 fu.log(classname + ": Incorrect end provided, exiting", out_log) 

348 raise SystemExit(classname + ": Incorrect end provided") 

349 return str(end) 

350 

351 

352def get_dt(properties, out_log, classname): 

353 """Gets dt""" 

354 dt = properties.get("dt", get_default_value("dt")) 

355 if dt is None: 

356 return dt 

357 if not is_valid_int(dt): 

358 fu.log(classname + ": Incorrect dt provided, exiting", out_log) 

359 raise SystemExit(classname + ": Incorrect dt provided") 

360 return str(dt) 

361 

362 

363def get_ot_str_ens(properties, out_log, classname): 

364 """Gets output type""" 

365 output_type = properties.get("output_type", get_default_value("ot_str_ens")) 

366 if not is_valid_ot_str_ens(output_type): 

367 fu.log(classname + ": Incorrect output_type provided, exiting", out_log) 

368 raise SystemExit(classname + ": Incorrect output_type provided") 

369 return str(output_type) 

370 

371 

372def get_xvg(properties, out_log, classname): 

373 """Gets xvg""" 

374 xvg = properties.get("xvg", get_default_value("xvg")) 

375 if not is_valid_xvg_param(xvg): 

376 fu.log(classname + ": Incorrect xvg provided, exiting", out_log) 

377 raise SystemExit(classname + ": Incorrect xvg provided") 

378 return xvg 

379 

380 

381def get_dista(properties, out_log, classname): 

382 """Gets dista""" 

383 dista = properties.get("dista", get_default_value("dista")) 

384 if not is_valid_boolean(dista): 

385 fu.log(classname + ": Incorrect dista provided, exiting", out_log) 

386 raise SystemExit(classname + ": Incorrect dista provided") 

387 return dista 

388 

389 

390def get_method(properties, out_log, classname): 

391 """Gets method""" 

392 method = properties.get("method", get_default_value("method")) 

393 if not is_valid_method_param(method): 

394 fu.log(classname + ": Incorrect method provided, exiting", out_log) 

395 raise SystemExit(classname + ": Incorrect method provided") 

396 return method 

397 

398 

399def get_cutoff(properties, out_log, classname): 

400 """Gets cutoff""" 

401 cutoff = properties.get("cutoff", get_default_value("cutoff")) 

402 if not is_valid_float(cutoff): 

403 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log) 

404 raise SystemExit(classname + ": Incorrect cutoff provided") 

405 return str(cutoff) 

406 

407 

408def is_valid_boolean(val): 

409 """Checks if given value is boolean""" 

410 values = [True, False] 

411 return val in values 

412 

413 

414def is_valid_float(val): 

415 """Checks if given value is float""" 

416 if val and not isinstance(val, float) and not isinstance(val, int): 

417 return False 

418 return True 

419 

420 

421def is_valid_int(val): 

422 """Checks if given value is int""" 

423 if val and not isinstance(val, int): 

424 return False 

425 return True 

426 

427 

428def is_valid_method_param(met): 

429 """Checks if method is compatible with GROMACS""" 

430 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"] 

431 return met in methods 

432 

433 

434def is_valid_structure(ext): 

435 """Checks if structure format is compatible with GROMACS""" 

436 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"] 

437 return ext in formats 

438 

439 

440def is_valid_index(ext): 

441 """Checks if structure format is compatible with GROMACS""" 

442 formats = ["ndx"] 

443 return ext in formats 

444 

445 

446def is_valid_trajectory(ext): 

447 """Checks if trajectory format is compatible with GROMACS""" 

448 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"] 

449 return ext in formats 

450 

451 

452def is_valid_trajectory_output(ext): 

453 """Checks if trajectory format is compatible with GROMACS""" 

454 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"] 

455 return ext in formats 

456 

457 

458def is_valid_energy(ext): 

459 """Checks if energy format is compatible with GROMACS""" 

460 formats = ["edr"] 

461 return ext in formats 

462 

463 

464def is_valid_xvg(ext): 

465 """Checks if file is XVG""" 

466 formats = ["xvg"] 

467 return ext in formats 

468 

469 

470def is_valid_zip(ext): 

471 """Checks if file is ZIP""" 

472 formats = ["zip"] 

473 return ext in formats 

474 

475 

476def is_valid_xvg_param(ext): 

477 """Checks xvg parameter""" 

478 formats = ["xmgrace", "xmgr", "none"] 

479 return ext in formats 

480 

481 

482def is_valid_ot_str_ens(ext): 

483 """Checks if output type for structure ensemble is correct""" 

484 formats = ["gro", "g96", "pdb"] 

485 return ext in formats 

486 

487 

488def is_valid_pbc(pbc): 

489 """Checks pbc parameter""" 

490 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"] 

491 return pbc in values 

492 

493 

494def is_valid_ur(ur): 

495 """Checks ur parameter""" 

496 values = ["rect", "tric", "compact"] 

497 return ur in values 

498 

499 

500def is_valid_fit(fit): 

501 """Checks fit parameter""" 

502 values = [ 

503 "none", 

504 "rot+trans", 

505 "rotxy+transxy", 

506 "translation", 

507 "transxy", 

508 "progressive", 

509 ] 

510 return fit in values 

511 

512 

513def is_valid_term(iterms): 

514 """Checks if term is correct""" 

515 cterms = [ 

516 "Angle", 

517 "Proper-Dih.", 

518 "Improper-Dih.", 

519 "LJ-14", 

520 "Coulomb-14", 

521 "LJ-(SR)", 

522 "Coulomb-(SR)", 

523 "Coul.-recip.", 

524 "Position-Rest.", 

525 "Potential", 

526 "Kinetic-En.", 

527 "Total-Energy", 

528 "Temperature", 

529 "Pressure", 

530 " Constr.-rmsd", 

531 "Box-X", 

532 "Box-Y", 

533 " Box-Z", 

534 "Volume", 

535 "Density", 

536 "pV", 

537 "Enthalpy", 

538 "Vir-XX", 

539 "Vir-XY", 

540 "Vir-XZ", 

541 "Vir-YX", 

542 "Vir-YY", 

543 "Vir-YZ", 

544 "Vir-ZX", 

545 "Vir-ZY", 

546 "Vir-ZZ", 

547 "Pres-XX", 

548 "Pres-XY", 

549 "Pres-XZ", 

550 "Pres-YX", 

551 "Pres-YY", 

552 "Pres-YZ", 

553 "Pres-ZX", 

554 "Pres-ZY", 

555 "Pres-ZZ", 

556 "#Surf*SurfTen", 

557 "Box-Vel-XX", 

558 "Box-Vel-YY", 

559 "Box-Vel-ZZ", 

560 "Mu-X", 

561 "Mu-Y", 

562 "Mu-Z", 

563 "T-Protein", 

564 "T-non-Protein", 

565 "Lamb-Protein", 

566 "Lamb-non-Protein", 

567 ] 

568 return all(elem in cterms for elem in iterms) 

569 

570 

571def is_valid_selection(ext): 

572 """Checks if selection is correct""" 

573 formats = [ 

574 "System", 

575 "Protein", 

576 "Protein-H", 

577 "C-alpha", 

578 "Backbone", 

579 "MainChain", 

580 "MainChain+Cb", 

581 "MainChain+H", 

582 "SideChain", 

583 "SideChain-H", 

584 "Prot-Masses", 

585 "non-Protein", 

586 "Water", 

587 "SOL", 

588 "non-Water", 

589 "Ion", 

590 "NA", 

591 "CL", 

592 "Water_and_ions", 

593 "DNA", 

594 "RNA", 

595 "Protein_DNA", 

596 "Protein_RNA", 

597 "Protein_DNA_RNA", 

598 "DNA_RNA", 

599 "DPPC", 

600 "DMPC", 

601 "POPG", 

602 "POPA", 

603 "POPC", 

604 "POPE", 

605 "DMTAP", 

606 "POPS" 

607 ] 

608 return ext in formats 

609 

610 

611def copy_instructions_file_to_container(instructions_file, unique_dir): 

612 shutil.copy2(instructions_file, unique_dir) 

613 

614 

615def remove_tmp_files(list, remove_tmp, out_log): 

616 """Removes temporal files generated by the wrapper""" 

617 if remove_tmp: 

618 tmp_files = list 

619 removed_files = [f for f in tmp_files if fu.rm(f)] 

620 fu.log("Removed: %s" % str(removed_files), out_log) 

621 

622 

623def process_output_trjconv_str_ens( 

624 tmp_folder, output_file, output_dir, glob_pattern, out_log 

625): 

626 tmp_fl = list(Path(tmp_folder).glob(glob_pattern)) 

627 if not tmp_fl: 

628 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb")) 

629 

630 files_list = [] 

631 for file_name in tmp_fl: 

632 files_list.append(file_name) 

633 

634 # adding files from temporary folder to zip 

635 fu.zip_list(output_file, files_list, out_log) 

636 

637 shutil.copy2(output_file, output_dir) 

638 

639 

640def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]: 

641 """ 

642 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is. 

643 Returns an empty list if input_data is None. 

644 

645 Parameters: 

646 input_data (str, list, or None): The string, list, or None value to convert. 

647 

648 Returns: 

649 list: A list of string elements or an empty list if input_data is None. 

650 """ 

651 if input_data is None: 

652 return [] 

653 

654 if isinstance(input_data, list): 

655 # If input is already a list, return it 

656 return input_data 

657 

658 # If input is a string, determine the delimiter based on presence of commas 

659 delimiter = "," if "," in input_data else " " 

660 items = input_data.split(delimiter) 

661 

662 # Remove whitespace from each item and ignore empty strings 

663 processed_items = [item.strip() for item in items if item.strip()] 

664 

665 return processed_items