Coverage for biobb_analysis/gromacs/common.py: 61%

315 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-14 14:38 +0000

1"""Common functions for package biobb_analysis.gromacs""" 

2 

3import re 

4import shutil 

5from pathlib import Path, PurePath 

6from typing import Optional, Union 

7 

8from biobb_common.command_wrapper import cmd_wrapper 

9from biobb_common.tools import file_utils as fu 

10 

11 

12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool: 

13 print("Comparing GROMACS files:") 

14 print("FILE_A: %s" % str(Path(file_a).resolve())) 

15 print("FILE_B: %s" % str(Path(file_b).resolve())) 

16 check_result = "check_result.out" 

17 cmd = [gmx, "check"] 

18 if file_a.endswith(".tpr"): 

19 cmd.append("-s1") 

20 else: 

21 cmd.append("-f") 

22 cmd.append(file_a) 

23 if file_b.endswith(".tpr"): 

24 cmd.append("-s2") 

25 else: 

26 cmd.append("-f2") 

27 cmd.append(file_b) 

28 cmd.append("> check_result.out") 

29 cmd_wrapper.CmdWrapper(cmd).launch() 

30 print("Result file: %s" % str(Path(check_result).resolve())) 

31 with open(check_result) as check_file: 

32 for line_num, line in enumerate(check_file): 

33 if not line.rstrip(): 

34 continue 

35 if line.startswith("Both files read correctly"): 

36 continue 

37 if not line.startswith("comparing"): 

38 print("Discrepance found in line %d: %s" % (line_num, line)) 

39 return False 

40 return True 

41 

42 

43def check_energy_path(path, out_log, classname): 

44 """Checks energy input file""" 

45 if not Path(path).exists(): 

46 fu.log(classname + ": Unexisting energy input file, exiting", out_log) 

47 raise SystemExit(classname + ": Unexisting energy input file") 

48 file_extension = PurePath(path).suffix 

49 if not is_valid_energy(file_extension[1:]): 

50 fu.log( 

51 classname 

52 + ": Format %s in energy input file is not compatible" % file_extension[1:], 

53 out_log, 

54 ) 

55 raise SystemExit( 

56 classname 

57 + ": Format %s in energy input file is not compatible" % file_extension[1:] 

58 ) 

59 # if file input has no path, add cwd because execution is launched on tmp folder 

60 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

61 path = str(PurePath(Path.cwd()).joinpath(path)) 

62 return path 

63 

64 

65def check_input_path(path, out_log, classname): 

66 """Checks input structure file""" 

67 if not Path(path).exists(): 

68 fu.log(classname + ": Unexisting structure input file, exiting", out_log) 

69 raise SystemExit(classname + ": Unexisting structure input file") 

70 file_extension = PurePath(path).suffix 

71 if not is_valid_structure(file_extension[1:]): 

72 fu.log( 

73 classname 

74 + ": Format %s in structure input file is not compatible" 

75 % file_extension[1:], 

76 out_log, 

77 ) 

78 raise SystemExit( 

79 classname 

80 + ": Format %s in structure input file is not compatible" 

81 % file_extension[1:] 

82 ) 

83 # if file input has no path, add cwd because execution is launched on tmp folder 

84 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

85 path = str(PurePath(Path.cwd()).joinpath(path)) 

86 return path 

87 

88 

89def check_index_path(path, out_log, classname): 

90 """Checks index input file""" 

91 if not path: 

92 return None 

93 file_extension = PurePath(path).suffix 

94 if not is_valid_index(file_extension[1:]): 

95 fu.log( 

96 classname 

97 + ": Format %s in index input file is not compatible" % file_extension[1:], 

98 out_log, 

99 ) 

100 raise SystemExit( 

101 classname 

102 + ": Format %s in index input file is not compatible" % file_extension[1:] 

103 ) 

104 # if file input has no path, add cwd because execution is launched on tmp folder 

105 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

106 path = str(PurePath(Path.cwd()).joinpath(path)) 

107 return path 

108 

109 

110def check_traj_path(path, out_log, classname): 

111 """Checks input structure file""" 

112 if not Path(path).exists(): 

113 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log) 

114 raise SystemExit(classname + ": Unexisting trajectory input file") 

115 file_extension = PurePath(path).suffix 

116 if not is_valid_trajectory(file_extension[1:]): 

117 fu.log( 

118 classname 

119 + ": Format %s in trajectory input file is not compatible" 

120 % file_extension[1:], 

121 out_log, 

122 ) 

123 raise SystemExit( 

124 classname 

125 + ": Format %s in trajectory input file is not compatible" 

126 % file_extension[1:] 

127 ) 

128 # if file input has no path, add cwd because execution is launched on tmp folder 

129 if PurePath(path).name == path or not PurePath(path).is_absolute(): 

130 path = str(PurePath(Path.cwd()).joinpath(path)) 

131 return path 

132 

133 

134def check_out_xvg_path(path, out_log, classname): 

135 """Checks if output folder exists and format is xvg""" 

136 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

137 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

138 raise SystemExit(classname + ": Unexisting output folder") 

139 file_extension = PurePath(path).suffix 

140 if not is_valid_xvg(file_extension[1:]): 

141 fu.log( 

142 classname 

143 + ": Format %s in output file is not compatible" % file_extension[1:], 

144 out_log, 

145 ) 

146 raise SystemExit( 

147 classname 

148 + ": Format %s in output file is not compatible" % file_extension[1:] 

149 ) 

150 return path 

151 

152 

153def check_out_pdb_path(path, out_log, classname): 

154 """Checks if output folder exists and format is xvg""" 

155 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

156 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

157 raise SystemExit(classname + ": Unexisting output folder") 

158 file_extension = PurePath(path).suffix 

159 if not is_valid_structure(file_extension[1:]): 

160 fu.log( 

161 classname 

162 + ": Format %s in output file is not compatible" % file_extension[1:], 

163 out_log, 

164 ) 

165 raise SystemExit( 

166 classname 

167 + ": Format %s in output file is not compatible" % file_extension[1:] 

168 ) 

169 return path 

170 

171 

172def check_out_traj_path(path, out_log, classname): 

173 """Checks if output folder exists and format is correct""" 

174 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

175 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

176 raise SystemExit(classname + ": Unexisting output folder") 

177 file_extension = PurePath(path).suffix 

178 if not is_valid_trajectory_output(file_extension[1:]): 

179 fu.log( 

180 classname 

181 + ": Format %s in output file is not compatible" % file_extension[1:], 

182 out_log, 

183 ) 

184 raise SystemExit( 

185 classname 

186 + ": Format %s in output file is not compatible" % file_extension[1:] 

187 ) 

188 return path 

189 

190 

191def check_out_str_ens_path(path, out_log, classname): 

192 """Checks if output folder exists and format is correct""" 

193 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

194 fu.log(classname + ": Unexisting output folder, exiting", out_log) 

195 raise SystemExit(classname + ": Unexisting output folder") 

196 file_extension = PurePath(path).suffix 

197 if not is_valid_zip(file_extension[1:]): 

198 fu.log( 

199 classname 

200 + ": Format %s in output file is not compatible" % file_extension[1:], 

201 out_log, 

202 ) 

203 raise SystemExit( 

204 classname 

205 + ": Format %s in output file is not compatible" % file_extension[1:] 

206 ) 

207 return path 

208 

209 

210def get_default_value(key): 

211 """Gives default values according to the given key""" 

212 

213 default_values = { 

214 "instructions_file": "instructions.in", 

215 "binary_path": "gmx", 

216 "terms": ["Potential"], 

217 "selection": "System", 

218 "xvg": "none", 

219 "dista": False, 

220 "method": "linkage", 

221 "cutoff": 0.1, 

222 "cluster_selection": "System", 

223 "fit_selection": "System", 

224 "center_selection": "System", 

225 "output_selection": "System", 

226 "pbc": "mol", 

227 "center": True, 

228 "fit": "none", 

229 "ur": "compact", 

230 "skip": 1, 

231 "start": 0, 

232 "end": 0, 

233 "dt": 0, 

234 "ot_str_ens": "pdb", 

235 } 

236 

237 return default_values[key] 

238 

239 

240def get_binary_path(properties, type): 

241 """Gets binary path""" 

242 return properties.get(type, get_default_value(type)) 

243 

244 

245def get_terms(properties, out_log, classname): 

246 """Gets energy terms""" 

247 terms = properties.get("terms", dict()) 

248 if not terms or not isinstance(terms, list): 

249 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log) 

250 raise SystemExit(classname + ": No terms provided or incorrect format") 

251 if not is_valid_term(terms): 

252 fu.log(classname + ": Incorrect terms provided, exiting", out_log) 

253 raise SystemExit(classname + ": Incorrect terms provided") 

254 return properties.get("terms", "") 

255 

256 

257def get_selection(properties, out_log, classname): 

258 """Gets selection items""" 

259 selection = properties.get("selection", get_default_value("selection")) 

260 if not selection: 

261 fu.log( 

262 classname + ": No selection provided or incorrect format, exiting", out_log 

263 ) 

264 raise SystemExit(classname + ": No selection provided or incorrect format") 

265 if not is_valid_selection(selection): 

266 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

267 raise SystemExit(classname + ": Incorrect selection provided") 

268 return selection 

269 

270 

271def get_image_selection(properties, key, out_log, classname): 

272 """Gets selection items""" 

273 selection = properties.get(key, get_default_value(key)) 

274 if not selection: 

275 fu.log( 

276 classname + ": No selection provided or incorrect format, exiting", out_log 

277 ) 

278 raise SystemExit(classname + ": No selection provided or incorrect format") 

279 if not is_valid_selection(selection): 

280 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

281 raise SystemExit(classname + ": Incorrect selection provided") 

282 return selection 

283 

284 

285def get_selection_index_file(properties, index, key, out_log, classname): 

286 """Gets selection items from provided index file""" 

287 pattern = re.compile(r"\[.*\]") 

288 selection = [] 

289 with open(index, "r") as ndx_file: 

290 for i, line in enumerate(ndx_file): 

291 for match in re.finditer(pattern, line): 

292 selection.append(re.sub(r"[\[\] ]", "", match.group())) 

293 sel = properties.get(key, get_default_value(key)) 

294 if sel not in selection: 

295 fu.log(classname + ": Incorrect selection provided, exiting", out_log) 

296 raise SystemExit(classname + ": Incorrect selection provided") 

297 return sel 

298 

299 

300def get_pbc(properties, out_log, classname): 

301 """Gets pbc""" 

302 pbc = properties.get("pbc", get_default_value("pbc")) 

303 if not is_valid_pbc(pbc): 

304 fu.log(classname + ": Incorrect pbc provided, exiting", out_log) 

305 raise SystemExit(classname + ": Incorrect pbc provided") 

306 return pbc 

307 

308 

309def get_center(properties, out_log, classname): 

310 """Gets center""" 

311 center = properties.get("center", get_default_value("center")) 

312 if not is_valid_boolean(center): 

313 fu.log(classname + ": Incorrect center provided, exiting", out_log) 

314 raise SystemExit(classname + ": Incorrect center provided") 

315 return center 

316 

317 

318def get_ur(properties, out_log, classname): 

319 """Gets ur""" 

320 ur = properties.get("ur", get_default_value("ur")) 

321 if not is_valid_ur(ur): 

322 fu.log(classname + ": Incorrect ur provided, exiting", out_log) 

323 raise SystemExit(classname + ": Incorrect ur provided") 

324 return ur 

325 

326 

327def get_fit(properties, out_log, classname): 

328 """Gets fit""" 

329 fit = properties.get("fit", get_default_value("fit")) 

330 if not is_valid_fit(fit): 

331 fu.log(classname + ": Incorrect fit provided, exiting", out_log) 

332 raise SystemExit(classname + ": Incorrect fit provided") 

333 return fit 

334 

335 

336def get_skip(properties, out_log, classname): 

337 """Gets skip""" 

338 skip = properties.get("skip", get_default_value("skip")) 

339 if not is_valid_int(skip): 

340 fu.log(classname + ": Incorrect skip provided, exiting", out_log) 

341 raise SystemExit(classname + ": Incorrect start provided") 

342 return str(skip) 

343 

344 

345def get_start(properties, out_log, classname): 

346 """Gets start""" 

347 start = properties.get("start", get_default_value("start")) 

348 if not is_valid_int(start): 

349 fu.log(classname + ": Incorrect start provided, exiting", out_log) 

350 raise SystemExit(classname + ": Incorrect start provided") 

351 return str(start) 

352 

353 

354def get_end(properties, out_log, classname): 

355 """Gets end""" 

356 end = properties.get("end", get_default_value("end")) 

357 if not is_valid_int(end): 

358 fu.log(classname + ": Incorrect end provided, exiting", out_log) 

359 raise SystemExit(classname + ": Incorrect end provided") 

360 return str(end) 

361 

362 

363def get_dt(properties, out_log, classname): 

364 """Gets dt""" 

365 dt = properties.get("dt", get_default_value("dt")) 

366 if not is_valid_int(dt): 

367 fu.log(classname + ": Incorrect dt provided, exiting", out_log) 

368 raise SystemExit(classname + ": Incorrect dt provided") 

369 return str(dt) 

370 

371 

372def get_ot_str_ens(properties, out_log, classname): 

373 """Gets output type""" 

374 output_type = properties.get("output_type", get_default_value("ot_str_ens")) 

375 if not is_valid_ot_str_ens(output_type): 

376 fu.log(classname + ": Incorrect output_type provided, exiting", out_log) 

377 raise SystemExit(classname + ": Incorrect output_type provided") 

378 return str(output_type) 

379 

380 

381def get_xvg(properties, out_log, classname): 

382 """Gets xvg""" 

383 xvg = properties.get("xvg", get_default_value("xvg")) 

384 if not is_valid_xvg_param(xvg): 

385 fu.log(classname + ": Incorrect xvg provided, exiting", out_log) 

386 raise SystemExit(classname + ": Incorrect xvg provided") 

387 return xvg 

388 

389 

390def get_dista(properties, out_log, classname): 

391 """Gets dista""" 

392 dista = properties.get("dista", get_default_value("dista")) 

393 if not is_valid_boolean(dista): 

394 fu.log(classname + ": Incorrect dista provided, exiting", out_log) 

395 raise SystemExit(classname + ": Incorrect dista provided") 

396 return dista 

397 

398 

399def get_method(properties, out_log, classname): 

400 """Gets method""" 

401 method = properties.get("method", get_default_value("method")) 

402 if not is_valid_method_param(method): 

403 fu.log(classname + ": Incorrect method provided, exiting", out_log) 

404 raise SystemExit(classname + ": Incorrect method provided") 

405 return method 

406 

407 

408def get_cutoff(properties, out_log, classname): 

409 """Gets cutoff""" 

410 cutoff = properties.get("cutoff", get_default_value("cutoff")) 

411 if not is_valid_float(cutoff): 

412 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log) 

413 raise SystemExit(classname + ": Incorrect cutoff provided") 

414 return str(cutoff) 

415 

416 

417def is_valid_boolean(val): 

418 """Checks if given value is boolean""" 

419 values = [True, False] 

420 return val in values 

421 

422 

423def is_valid_float(val): 

424 """Checks if given value is float""" 

425 if val and not isinstance(val, float) and not isinstance(val, int): 

426 return False 

427 return True 

428 

429 

430def is_valid_int(val): 

431 """Checks if given value is int""" 

432 if val and not isinstance(val, int): 

433 return False 

434 return True 

435 

436 

437def is_valid_method_param(met): 

438 """Checks if method is compatible with GROMACS""" 

439 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"] 

440 return met in methods 

441 

442 

443def is_valid_structure(ext): 

444 """Checks if structure format is compatible with GROMACS""" 

445 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"] 

446 return ext in formats 

447 

448 

449def is_valid_index(ext): 

450 """Checks if structure format is compatible with GROMACS""" 

451 formats = ["ndx"] 

452 return ext in formats 

453 

454 

455def is_valid_trajectory(ext): 

456 """Checks if trajectory format is compatible with GROMACS""" 

457 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"] 

458 return ext in formats 

459 

460 

461def is_valid_trajectory_output(ext): 

462 """Checks if trajectory format is compatible with GROMACS""" 

463 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"] 

464 return ext in formats 

465 

466 

467def is_valid_energy(ext): 

468 """Checks if energy format is compatible with GROMACS""" 

469 formats = ["edr"] 

470 return ext in formats 

471 

472 

473def is_valid_xvg(ext): 

474 """Checks if file is XVG""" 

475 formats = ["xvg"] 

476 return ext in formats 

477 

478 

479def is_valid_zip(ext): 

480 """Checks if file is ZIP""" 

481 formats = ["zip"] 

482 return ext in formats 

483 

484 

485def is_valid_xvg_param(ext): 

486 """Checks xvg parameter""" 

487 formats = ["xmgrace", "xmgr", "none"] 

488 return ext in formats 

489 

490 

491def is_valid_ot_str_ens(ext): 

492 """Checks if output type for structure ensemble is correct""" 

493 formats = ["gro", "g96", "pdb"] 

494 return ext in formats 

495 

496 

497def is_valid_pbc(pbc): 

498 """Checks pbc parameter""" 

499 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"] 

500 return pbc in values 

501 

502 

503def is_valid_ur(ur): 

504 """Checks ur parameter""" 

505 values = ["rect", "tric", "compact"] 

506 return ur in values 

507 

508 

509def is_valid_fit(fit): 

510 """Checks fit parameter""" 

511 values = [ 

512 "none", 

513 "rot+trans", 

514 "rotxy+transxy", 

515 "translation", 

516 "transxy", 

517 "progressive", 

518 ] 

519 return fit in values 

520 

521 

522def is_valid_term(iterms): 

523 """Checks if term is correct""" 

524 cterms = [ 

525 "Angle", 

526 "Proper-Dih.", 

527 "Improper-Dih.", 

528 "LJ-14", 

529 "Coulomb-14", 

530 "LJ-(SR)", 

531 "Coulomb-(SR)", 

532 "Coul.-recip.", 

533 "Position-Rest.", 

534 "Potential", 

535 "Kinetic-En.", 

536 "Total-Energy", 

537 "Temperature", 

538 "Pressure", 

539 " Constr.-rmsd", 

540 "Box-X", 

541 "Box-Y", 

542 " Box-Z", 

543 "Volume", 

544 "Density", 

545 "pV", 

546 "Enthalpy", 

547 "Vir-XX", 

548 "Vir-XY", 

549 "Vir-XZ", 

550 "Vir-YX", 

551 "Vir-YY", 

552 "Vir-YZ", 

553 "Vir-ZX", 

554 "Vir-ZY", 

555 "Vir-ZZ", 

556 "Pres-XX", 

557 "Pres-XY", 

558 "Pres-XZ", 

559 "Pres-YX", 

560 "Pres-YY", 

561 "Pres-YZ", 

562 "Pres-ZX", 

563 "Pres-ZY", 

564 "Pres-ZZ", 

565 "#Surf*SurfTen", 

566 "Box-Vel-XX", 

567 "Box-Vel-YY", 

568 "Box-Vel-ZZ", 

569 "Mu-X", 

570 "Mu-Y", 

571 "Mu-Z", 

572 "T-Protein", 

573 "T-non-Protein", 

574 "Lamb-Protein", 

575 "Lamb-non-Protein", 

576 ] 

577 return all(elem in cterms for elem in iterms) 

578 

579 

580def is_valid_selection(ext): 

581 """Checks if selection is correct""" 

582 formats = [ 

583 "System", 

584 "Protein", 

585 "Protein-H", 

586 "C-alpha", 

587 "Backbone", 

588 "MainChain", 

589 "MainChain+Cb", 

590 "MainChain+H", 

591 "SideChain", 

592 "SideChain-H", 

593 "Prot-Masses", 

594 "non-Protein", 

595 "Water", 

596 "SOL", 

597 "non-Water", 

598 "Ion", 

599 "NA", 

600 "CL", 

601 "Water_and_ions", 

602 "DNA", 

603 "RNA", 

604 "Protein_DNA", 

605 "Protein_RNA", 

606 "Protein_DNA_RNA", 

607 "DNA_RNA", 

608 ] 

609 return ext in formats 

610 

611 

612def copy_instructions_file_to_container(instructions_file, unique_dir): 

613 shutil.copy2(instructions_file, unique_dir) 

614 

615 

616def remove_tmp_files(list, remove_tmp, out_log): 

617 """Removes temporal files generated by the wrapper""" 

618 if remove_tmp: 

619 tmp_files = list 

620 removed_files = [f for f in tmp_files if fu.rm(f)] 

621 fu.log("Removed: %s" % str(removed_files), out_log) 

622 

623 

624def process_output_trjconv_str_ens( 

625 tmp_folder, output_file, output_dir, glob_pattern, out_log 

626): 

627 tmp_fl = list(Path(tmp_folder).glob(glob_pattern)) 

628 if not tmp_fl: 

629 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb")) 

630 

631 files_list = [] 

632 for file_name in tmp_fl: 

633 files_list.append(file_name) 

634 

635 # adding files from temporary folder to zip 

636 fu.zip_list(output_file, files_list, out_log) 

637 

638 shutil.copy2(output_file, output_dir) 

639 

640 

641def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]: 

642 """ 

643 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is. 

644 Returns an empty list if input_data is None. 

645 

646 Parameters: 

647 input_data (str, list, or None): The string, list, or None value to convert. 

648 

649 Returns: 

650 list: A list of string elements or an empty list if input_data is None. 

651 """ 

652 if input_data is None: 

653 return [] 

654 

655 if isinstance(input_data, list): 

656 # If input is already a list, return it 

657 return input_data 

658 

659 # If input is a string, determine the delimiter based on presence of commas 

660 delimiter = "," if "," in input_data else " " 

661 items = input_data.split(delimiter) 

662 

663 # Remove whitespace from each item and ignore empty strings 

664 processed_items = [item.strip() for item in items if item.strip()] 

665 

666 return processed_items