Coverage for biobb_analysis/gromacs/common.py: 61%

315 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-28 12:11 +0000

1"""Common functions for package biobb_analysis.gromacs""" 

2 

3import re 

4import shutil 

5from pathlib import Path, PurePath 

6from typing import Optional, Union 

7 

8from biobb_common.command_wrapper import cmd_wrapper 

9from biobb_common.tools import file_utils as fu 

10 

11 

12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool: 

13 print("Comparing GROMACS files:") 

14 print("FILE_A: %s" % str(Path(file_a).resolve())) 

15 print("FILE_B: %s" % str(Path(file_b).resolve())) 

16 check_result = "check_result.out" 

17 cmd = [gmx, "check"] 

18 if file_a.endswith(".tpr"): 

19 cmd.append("-s1") 

20 else: 

21 cmd.append("-f") 

22 cmd.append(file_a) 

23 if file_b.endswith(".tpr"): 

24 cmd.append("-s2") 

25 else: 

26 cmd.append("-f2") 

27 cmd.append(file_b) 

28 cmd.append("> check_result.out") 

29 cmd_wrapper.CmdWrapper(cmd).launch() 

30 print("Result file: %s" % str(Path(check_result).resolve())) 

31 with open(check_result) as check_file: 

32 for line_num, line in enumerate(check_file): 

33 if not line.rstrip(): 

34 continue 

35 if line.startswith("Both files read correctly"): 

36 continue 

37 if not line.startswith("comparing"): 

38 print("Discrepance found in line %d: %s" % (line_num, line)) 

39 return False 

40 return True 

41 

42 

43def check_energy_path(path, out_log, classname): 

44 """Checks energy input file""" 

45 if not Path(path).exists(): 

46 fu.log(classname + ": Unexisting energy input file, exiting", out_log) 

47 raise SystemExit(classname + ": Unexisting energy input file") 

48 file_extension = PurePath(path).suffix 

49 if not is_valid_energy(file_extension[1:]): 

50 fu.log( 

51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:], 

52 out_log, 

53 ) 

54 raise SystemExit( 

55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:] 

56 ) 

57 # if file input has no path, add cwd because execution is launched on tmp folder 

58 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

59 path = str(PurePath(Path.cwd()).joinpath(path)) 

60 return path 

61 

62 

63def check_input_path(path, out_log, classname): 

64 """Checks input structure file""" 

65 if not Path(path).exists(): 

66 fu.log(classname + ": Unexisting structure input file, exiting", out_log) 

67 raise SystemExit(classname + ": Unexisting structure input file") 

68 file_extension = PurePath(path).suffix 

69 if not is_valid_structure(file_extension[1:]): 

70 fu.log( 

71 classname + ": Format %s in structure input file is not compatible" 

72 % file_extension[1:], 

73 out_log, 

74 ) 

75 raise SystemExit( 

76 classname + ": Format %s in structure input file is not compatible" 

77 % file_extension[1:] 

78 ) 

79 # if file input has no path, add cwd because execution is launched on tmp folder 

80 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

81 path = str(PurePath(Path.cwd()).joinpath(path)) 

82 return path 

83 

84 

85def check_index_path(path, out_log, classname): 

86 """Checks index input file""" 

87 if not path: 

88 return None 

89 file_extension = PurePath(path).suffix 

90 if not is_valid_index(file_extension[1:]): 

91 fu.log( 

92 classname + ": Format %s in index input file is not compatible" % file_extension[1:], 

93 out_log, 

94 ) 

95 raise SystemExit( 

96 classname + ": Format %s in index input file is not compatible" % file_extension[1:] 

97 ) 

98 # if file input has no path, add cwd because execution is launched on tmp folder 

99 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

100 path = str(PurePath(Path.cwd()).joinpath(path)) 

101 return path 

102 

103 

104def check_traj_path(path, out_log, classname): 

105 """Checks input structure file""" 

106 if not Path(path).exists(): 

107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log) 

108 raise SystemExit(classname + ": Unexisting trajectory input file") 

109 file_extension = PurePath(path).suffix 

110 if not is_valid_trajectory(file_extension[1:]): 

111 fu.log( 

112 classname + ": Format %s in trajectory input file is not compatible" 

113 % file_extension[1:], 

114 out_log, 

115 ) 

116 raise SystemExit( 

117 classname + ": Format %s in trajectory input file is not compatible" 

118 % file_extension[1:] 

119 ) 

120 # if file input has no path, add cwd because execution is launched on tmp folder 

121 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

122 path = str(PurePath(Path.cwd()).joinpath(path)) 

123 return path 

124 

125 

126def check_out_xvg_path(path, out_log, classname): 

127 """Checks if output folder exists and format is xvg""" 

128 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

129 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

130 raise SystemExit(classname + ": Unexisting output folder") 

131 file_extension = PurePath(path).suffix 

132 if not is_valid_xvg(file_extension[1:]): 

133 fu.log( 

134 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

135 out_log, 

136 ) 

137 raise SystemExit( 

138 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

139 ) 

140 return path 

141 

142 

143def check_out_pdb_path(path, out_log, classname): 

144 """Checks if output folder exists and format is xvg""" 

145 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

146 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

147 raise SystemExit(classname + ": Unexisting output folder") 

148 file_extension = PurePath(path).suffix 

149 if not is_valid_structure(file_extension[1:]): 

150 fu.log( 

151 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

152 out_log, 

153 ) 

154 raise SystemExit( 

155 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

156 ) 

157 return path 

158 

159 

160def check_out_traj_path(path, out_log, classname): 

161 """Checks if output folder exists and format is correct""" 

162 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

163 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

164 raise SystemExit(classname + ": Unexisting output folder") 

165 file_extension = PurePath(path).suffix 

166 if not is_valid_trajectory_output(file_extension[1:]): 

167 fu.log( 

168 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

169 out_log, 

170 ) 

171 raise SystemExit( 

172 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

173 ) 

174 return path 

175 

176 

177def check_out_str_ens_path(path, out_log, classname): 

178 """Checks if output folder exists and format is correct""" 

179 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

180 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

181 raise SystemExit(classname + ": Unexisting output folder") 

182 file_extension = PurePath(path).suffix 

183 if not is_valid_zip(file_extension[1:]): 

184 fu.log( 

185 classname + ": Format %s in output file is not compatible" % file_extension[1:], 

186 out_log, 

187 ) 

188 raise SystemExit( 

189 classname + ": Format %s in output file is not compatible" % file_extension[1:] 

190 ) 

191 return path 

192 

193 

194def get_default_value(key): 

195 """Gives default values according to the given key""" 

196 

197 default_values = { 

198 "instructions_file": "instructions.in", 

199 "binary_path": "gmx", 

200 "terms": ["Potential"], 

201 "selection": "System", 

202 "xvg": "none", 

203 "dista": False, 

204 "method": "linkage", 

205 "cutoff": 0.1, 

206 "cluster_selection": "System", 

207 "fit_selection": "System", 

208 "center_selection": "System", 

209 "output_selection": "System", 

210 "pbc": "mol", 

211 "center": True, 

212 "fit": "none", 

213 "ur": "compact", 

214 "skip": 1, 

215 "start": 0, 

216 "end": 0, 

217 "dt": 0, 

218 "ot_str_ens": "pdb", 

219 } 

220 

221 return default_values[key] 

222 

223 

224def get_binary_path(properties, type): 

225 """Gets binary path""" 

226 return properties.get(type, get_default_value(type)) 

227 

228 

229def get_terms(properties, out_log, classname): 

230 """Gets energy terms""" 

231 terms = properties.get("terms", dict()) 

232 if not terms or not isinstance(terms, list): 

233 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log) 

234 raise SystemExit(classname + ": No terms provided or incorrect format") 

235 if not is_valid_term(terms): 

236 fu.log(classname + ": Incorrect terms provided, exiting", out_log) 

237 raise SystemExit(classname + ": Incorrect terms provided") 

238 return properties.get("terms", "") 

239 

240 

241def get_selection(properties, out_log, classname): 

242 """Gets selection items""" 

243 selection = properties.get("selection", get_default_value("selection")) 

244 if not selection: 

245 fu.log( 

246 classname + ": No selection provided or incorrect format, exiting", out_log 

247 ) 

248 raise SystemExit(classname + ": No selection provided or incorrect format") 

249 if not is_valid_selection(selection): 

250 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

251 raise SystemExit(classname + ": Incorrect selection provided") 

252 return selection 

253 

254 

255def get_image_selection(properties, key, out_log, classname): 

256 """Gets selection items""" 

257 selection = properties.get(key, get_default_value(key)) 

258 if not selection: 

259 fu.log( 

260 classname + ": No selection provided or incorrect format, exiting", out_log 

261 ) 

262 raise SystemExit(classname + ": No selection provided or incorrect format") 

263 if not is_valid_selection(selection): 

264 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

265 raise SystemExit(classname + ": Incorrect selection provided") 

266 return selection 

267 

268 

269def get_selection_index_file(properties, index, key, out_log, classname): 

270 """Gets selection items from provided index file""" 

271 pattern = re.compile(r"\[.*\]") 

272 selection = [] 

273 with open(index, "r") as ndx_file: 

274 for i, line in enumerate(ndx_file): 

275 for match in re.finditer(pattern, line): 

276 selection.append(re.sub(r"[\[\] ]", "", match.group())) 

277 sel = properties.get(key, get_default_value(key)) 

278 if sel not in selection: 

279 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

280 raise SystemExit(classname + ": Incorrect selection provided") 

281 return sel 

282 

283 

284def get_pbc(properties, out_log, classname): 

285 """Gets pbc""" 

286 pbc = properties.get("pbc", get_default_value("pbc")) 

287 if not is_valid_pbc(pbc): 

288 fu.log(classname + ": Incorrect pbc provided, exiting", out_log) 

289 raise SystemExit(classname + ": Incorrect pbc provided") 

290 return pbc 

291 

292 

293def get_center(properties, out_log, classname): 

294 """Gets center""" 

295 center = properties.get("center", get_default_value("center")) 

296 if not is_valid_boolean(center): 

297 fu.log(classname + ": Incorrect center provided, exiting", out_log) 

298 raise SystemExit(classname + ": Incorrect center provided") 

299 return center 

300 

301 

302def get_ur(properties, out_log, classname): 

303 """Gets ur""" 

304 ur = properties.get("ur", get_default_value("ur")) 

305 if not is_valid_ur(ur): 

306 fu.log(classname + ": Incorrect ur provided, exiting", out_log) 

307 raise SystemExit(classname + ": Incorrect ur provided") 

308 return ur 

309 

310 

311def get_fit(properties, out_log, classname): 

312 """Gets fit""" 

313 fit = properties.get("fit", get_default_value("fit")) 

314 if not is_valid_fit(fit): 

315 fu.log(classname + ": Incorrect fit provided, exiting", out_log) 

316 raise SystemExit(classname + ": Incorrect fit provided") 

317 return fit 

318 

319 

320def get_skip(properties, out_log, classname): 

321 """Gets skip""" 

322 skip = properties.get("skip", get_default_value("skip")) 

323 if not is_valid_int(skip): 

324 fu.log(classname + ": Incorrect skip provided, exiting", out_log) 

325 raise SystemExit(classname + ": Incorrect start provided") 

326 return str(skip) 

327 

328 

329def get_start(properties, out_log, classname): 

330 """Gets start""" 

331 start = properties.get("start", get_default_value("start")) 

332 if not is_valid_int(start): 

333 fu.log(classname + ": Incorrect start provided, exiting", out_log) 

334 raise SystemExit(classname + ": Incorrect start provided") 

335 return str(start) 

336 

337 

338def get_end(properties, out_log, classname): 

339 """Gets end""" 

340 end = properties.get("end", get_default_value("end")) 

341 if not is_valid_int(end): 

342 fu.log(classname + ": Incorrect end provided, exiting", out_log) 

343 raise SystemExit(classname + ": Incorrect end provided") 

344 return str(end) 

345 

346 

347def get_dt(properties, out_log, classname): 

348 """Gets dt""" 

349 dt = properties.get("dt", get_default_value("dt")) 

350 if not is_valid_int(dt): 

351 fu.log(classname + ": Incorrect dt provided, exiting", out_log) 

352 raise SystemExit(classname + ": Incorrect dt provided") 

353 return str(dt) 

354 

355 

356def get_ot_str_ens(properties, out_log, classname): 

357 """Gets output type""" 

358 output_type = properties.get("output_type", get_default_value("ot_str_ens")) 

359 if not is_valid_ot_str_ens(output_type): 

360 fu.log(classname + ": Incorrect output_type provided, exiting", out_log) 

361 raise SystemExit(classname + ": Incorrect output_type provided") 

362 return str(output_type) 

363 

364 

365def get_xvg(properties, out_log, classname): 

366 """Gets xvg""" 

367 xvg = properties.get("xvg", get_default_value("xvg")) 

368 if not is_valid_xvg_param(xvg): 

369 fu.log(classname + ": Incorrect xvg provided, exiting", out_log) 

370 raise SystemExit(classname + ": Incorrect xvg provided") 

371 return xvg 

372 

373 

374def get_dista(properties, out_log, classname): 

375 """Gets dista""" 

376 dista = properties.get("dista", get_default_value("dista")) 

377 if not is_valid_boolean(dista): 

378 fu.log(classname + ": Incorrect dista provided, exiting", out_log) 

379 raise SystemExit(classname + ": Incorrect dista provided") 

380 return dista 

381 

382 

383def get_method(properties, out_log, classname): 

384 """Gets method""" 

385 method = properties.get("method", get_default_value("method")) 

386 if not is_valid_method_param(method): 

387 fu.log(classname + ": Incorrect method provided, exiting", out_log) 

388 raise SystemExit(classname + ": Incorrect method provided") 

389 return method 

390 

391 

392def get_cutoff(properties, out_log, classname): 

393 """Gets cutoff""" 

394 cutoff = properties.get("cutoff", get_default_value("cutoff")) 

395 if not is_valid_float(cutoff): 

396 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log) 

397 raise SystemExit(classname + ": Incorrect cutoff provided") 

398 return str(cutoff) 

399 

400 

401def is_valid_boolean(val): 

402 """Checks if given value is boolean""" 

403 values = [True, False] 

404 return val in values 

405 

406 

407def is_valid_float(val): 

408 """Checks if given value is float""" 

409 if val and not isinstance(val, float) and not isinstance(val, int): 

410 return False 

411 return True 

412 

413 

414def is_valid_int(val): 

415 """Checks if given value is int""" 

416 if val and not isinstance(val, int): 

417 return False 

418 return True 

419 

420 

421def is_valid_method_param(met): 

422 """Checks if method is compatible with GROMACS""" 

423 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"] 

424 return met in methods 

425 

426 

427def is_valid_structure(ext): 

428 """Checks if structure format is compatible with GROMACS""" 

429 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"] 

430 return ext in formats 

431 

432 

433def is_valid_index(ext): 

434 """Checks if structure format is compatible with GROMACS""" 

435 formats = ["ndx"] 

436 return ext in formats 

437 

438 

439def is_valid_trajectory(ext): 

440 """Checks if trajectory format is compatible with GROMACS""" 

441 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"] 

442 return ext in formats 

443 

444 

445def is_valid_trajectory_output(ext): 

446 """Checks if trajectory format is compatible with GROMACS""" 

447 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"] 

448 return ext in formats 

449 

450 

451def is_valid_energy(ext): 

452 """Checks if energy format is compatible with GROMACS""" 

453 formats = ["edr"] 

454 return ext in formats 

455 

456 

457def is_valid_xvg(ext): 

458 """Checks if file is XVG""" 

459 formats = ["xvg"] 

460 return ext in formats 

461 

462 

463def is_valid_zip(ext): 

464 """Checks if file is ZIP""" 

465 formats = ["zip"] 

466 return ext in formats 

467 

468 

469def is_valid_xvg_param(ext): 

470 """Checks xvg parameter""" 

471 formats = ["xmgrace", "xmgr", "none"] 

472 return ext in formats 

473 

474 

475def is_valid_ot_str_ens(ext): 

476 """Checks if output type for structure ensemble is correct""" 

477 formats = ["gro", "g96", "pdb"] 

478 return ext in formats 

479 

480 

481def is_valid_pbc(pbc): 

482 """Checks pbc parameter""" 

483 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"] 

484 return pbc in values 

485 

486 

487def is_valid_ur(ur): 

488 """Checks ur parameter""" 

489 values = ["rect", "tric", "compact"] 

490 return ur in values 

491 

492 

493def is_valid_fit(fit): 

494 """Checks fit parameter""" 

495 values = [ 

496 "none", 

497 "rot+trans", 

498 "rotxy+transxy", 

499 "translation", 

500 "transxy", 

501 "progressive", 

502 ] 

503 return fit in values 

504 

505 

506def is_valid_term(iterms): 

507 """Checks if term is correct""" 

508 cterms = [ 

509 "Angle", 

510 "Proper-Dih.", 

511 "Improper-Dih.", 

512 "LJ-14", 

513 "Coulomb-14", 

514 "LJ-(SR)", 

515 "Coulomb-(SR)", 

516 "Coul.-recip.", 

517 "Position-Rest.", 

518 "Potential", 

519 "Kinetic-En.", 

520 "Total-Energy", 

521 "Temperature", 

522 "Pressure", 

523 " Constr.-rmsd", 

524 "Box-X", 

525 "Box-Y", 

526 " Box-Z", 

527 "Volume", 

528 "Density", 

529 "pV", 

530 "Enthalpy", 

531 "Vir-XX", 

532 "Vir-XY", 

533 "Vir-XZ", 

534 "Vir-YX", 

535 "Vir-YY", 

536 "Vir-YZ", 

537 "Vir-ZX", 

538 "Vir-ZY", 

539 "Vir-ZZ", 

540 "Pres-XX", 

541 "Pres-XY", 

542 "Pres-XZ", 

543 "Pres-YX", 

544 "Pres-YY", 

545 "Pres-YZ", 

546 "Pres-ZX", 

547 "Pres-ZY", 

548 "Pres-ZZ", 

549 "#Surf*SurfTen", 

550 "Box-Vel-XX", 

551 "Box-Vel-YY", 

552 "Box-Vel-ZZ", 

553 "Mu-X", 

554 "Mu-Y", 

555 "Mu-Z", 

556 "T-Protein", 

557 "T-non-Protein", 

558 "Lamb-Protein", 

559 "Lamb-non-Protein", 

560 ] 

561 return all(elem in cterms for elem in iterms) 

562 

563 

564def is_valid_selection(ext): 

565 """Checks if selection is correct""" 

566 formats = [ 

567 "System", 

568 "Protein", 

569 "Protein-H", 

570 "C-alpha", 

571 "Backbone", 

572 "MainChain", 

573 "MainChain+Cb", 

574 "MainChain+H", 

575 "SideChain", 

576 "SideChain-H", 

577 "Prot-Masses", 

578 "non-Protein", 

579 "Water", 

580 "SOL", 

581 "non-Water", 

582 "Ion", 

583 "NA", 

584 "CL", 

585 "Water_and_ions", 

586 "DNA", 

587 "RNA", 

588 "Protein_DNA", 

589 "Protein_RNA", 

590 "Protein_DNA_RNA", 

591 "DNA_RNA", 

592 "DPPC", 

593 "DMPC", 

594 "POPG", 

595 "POPA", 

596 "POPC", 

597 "POPE", 

598 "DMTAP", 

599 "POPS" 

600 ] 

601 return ext in formats 

602 

603 

604def copy_instructions_file_to_container(instructions_file, unique_dir): 

605 shutil.copy2(instructions_file, unique_dir) 

606 

607 

608def remove_tmp_files(list, remove_tmp, out_log): 

609 """Removes temporal files generated by the wrapper""" 

610 if remove_tmp: 

611 tmp_files = list 

612 removed_files = [f for f in tmp_files if fu.rm(f)] 

613 fu.log("Removed: %s" % str(removed_files), out_log) 

614 

615 

616def process_output_trjconv_str_ens( 

617 tmp_folder, output_file, output_dir, glob_pattern, out_log 

618): 

619 tmp_fl = list(Path(tmp_folder).glob(glob_pattern)) 

620 if not tmp_fl: 

621 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb")) 

622 

623 files_list = [] 

624 for file_name in tmp_fl: 

625 files_list.append(file_name) 

626 

627 # adding files from temporary folder to zip 

628 fu.zip_list(output_file, files_list, out_log) 

629 

630 shutil.copy2(output_file, output_dir) 

631 

632 

633def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]: 

634 """ 

635 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is. 

636 Returns an empty list if input_data is None. 

637 

638 Parameters: 

639 input_data (str, list, or None): The string, list, or None value to convert. 

640 

641 Returns: 

642 list: A list of string elements or an empty list if input_data is None. 

643 """ 

644 if input_data is None: 

645 return [] 

646 

647 if isinstance(input_data, list): 

648 # If input is already a list, return it 

649 return input_data 

650 

651 # If input is a string, determine the delimiter based on presence of commas 

652 delimiter = "," if "," in input_data else " " 

653 items = input_data.split(delimiter) 

654 

655 # Remove whitespace from each item and ignore empty strings 

656 processed_items = [item.strip() for item in items if item.strip()] 

657 

658 return processed_items