Coverage for biobb_analysis/gromacs/common.py: 61%

303 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-06 15:22 +0000

1""" Common functions for package biobb_analysis.gromacs """ 

2from pathlib import Path, PurePath 

3import re 

4import shutil 

5from biobb_common.tools import file_utils as fu 

6from biobb_common.command_wrapper import cmd_wrapper 

7 

8 

9def gmx_check(file_a: str, file_b: str, gmx: str = 'gmx') -> bool: 

10 print("Comparing GROMACS files:") 

11 print("FILE_A: %s" % str(Path(file_a).resolve())) 

12 print("FILE_B: %s" % str(Path(file_b).resolve())) 

13 check_result = 'check_result.out' 

14 cmd = [gmx, 'check'] 

15 if file_a.endswith(".tpr"): 

16 cmd.append('-s1') 

17 else: 

18 cmd.append('-f') 

19 cmd.append(file_a) 

20 if file_b.endswith(".tpr"): 

21 cmd.append('-s2') 

22 else: 

23 cmd.append('-f2') 

24 cmd.append(file_b) 

25 cmd.append('> check_result.out') 

26 cmd_wrapper.CmdWrapper(cmd).launch() 

27 print("Result file: %s" % str(Path(check_result).resolve())) 

28 with open(check_result) as check_file: 

29 for line_num, line in enumerate(check_file): 

30 if not line.rstrip(): 

31 continue 

32 if line.startswith("Both files read correctly"): 

33 continue 

34 if not line.startswith('comparing'): 

35 print('Discrepance found in line %d: %s' % (line_num, line)) 

36 return False 

37 return True 

38 

39 

40def check_energy_path(path, out_log, classname): 

41 """ Checks energy input file """ 

42 if not Path(path).exists(): 

43 fu.log(classname + ': Unexisting energy input file, exiting', out_log) 

44 raise SystemExit(classname + ': Unexisting energy input file') 

45 file_extension = PurePath(path).suffix 

46 if not is_valid_energy(file_extension[1:]): 

47 fu.log(classname + ': Format %s in energy input file is not compatible' % file_extension[1:], out_log) 

48 raise SystemExit(classname + ': Format %s in energy input file is not compatible' % file_extension[1:]) 

49 # if file input has no path, add cwd because execution is launched on tmp folder 

50 if (PurePath(path).name == path or not PurePath(path).is_absolute()): 

51 path = str(PurePath(Path.cwd()).joinpath(path)) 

52 return path 

53 

54 

55def check_input_path(path, out_log, classname): 

56 """ Checks input structure file """ 

57 if not Path(path).exists(): 

58 fu.log(classname + ': Unexisting structure input file, exiting', out_log) 

59 raise SystemExit(classname + ': Unexisting structure input file') 

60 file_extension = PurePath(path).suffix 

61 if not is_valid_structure(file_extension[1:]): 

62 fu.log(classname + ': Format %s in structure input file is not compatible' % file_extension[1:], out_log) 

63 raise SystemExit(classname + ': Format %s in structure input file is not compatible' % file_extension[1:]) 

64 # if file input has no path, add cwd because execution is launched on tmp folder 

65 if (PurePath(path).name == path or not PurePath(path).is_absolute()): 

66 path = str(PurePath(Path.cwd()).joinpath(path)) 

67 return path 

68 

69 

70def check_index_path(path, out_log, classname): 

71 """ Checks index input file """ 

72 if not path: 

73 return None 

74 file_extension = PurePath(path).suffix 

75 if not is_valid_index(file_extension[1:]): 

76 fu.log(classname + ': Format %s in index input file is not compatible' % file_extension[1:], out_log) 

77 raise SystemExit(classname + ': Format %s in index input file is not compatible' % file_extension[1:]) 

78 # if file input has no path, add cwd because execution is launched on tmp folder 

79 if (PurePath(path).name == path or not PurePath(path).is_absolute()): 

80 path = str(PurePath(Path.cwd()).joinpath(path)) 

81 return path 

82 

83 

84def check_traj_path(path, out_log, classname): 

85 """ Checks input structure file """ 

86 if not Path(path).exists(): 

87 fu.log(classname + ': Unexisting trajectory input file, exiting', out_log) 

88 raise SystemExit(classname + ': Unexisting trajectory input file') 

89 file_extension = PurePath(path).suffix 

90 if not is_valid_trajectory(file_extension[1:]): 

91 fu.log(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:], out_log) 

92 raise SystemExit(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:]) 

93 # if file input has no path, add cwd because execution is launched on tmp folder 

94 if (PurePath(path).name == path or not PurePath(path).is_absolute()): 

95 path = str(PurePath(Path.cwd()).joinpath(path)) 

96 return path 

97 

98 

99def check_out_xvg_path(path, out_log, classname): 

100 """ Checks if output folder exists and format is xvg """ 

101 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

102 fu.log(classname + ': Unexisting output folder, exiting', out_log) 

103 raise SystemExit(classname + ': Unexisting output folder') 

104 file_extension = PurePath(path).suffix 

105 if not is_valid_xvg(file_extension[1:]): 

106 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log) 

107 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:]) 

108 return path 

109 

110 

111def check_out_pdb_path(path, out_log, classname): 

112 """ Checks if output folder exists and format is xvg """ 

113 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

114 fu.log(classname + ': Unexisting output folder, exiting', out_log) 

115 raise SystemExit(classname + ': Unexisting output folder') 

116 file_extension = PurePath(path).suffix 

117 if not is_valid_structure(file_extension[1:]): 

118 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log) 

119 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:]) 

120 return path 

121 

122 

123def check_out_traj_path(path, out_log, classname): 

124 """ Checks if output folder exists and format is correct """ 

125 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

126 fu.log(classname + ': Unexisting output folder, exiting', out_log) 

127 raise SystemExit(classname + ': Unexisting output folder') 

128 file_extension = PurePath(path).suffix 

129 if not is_valid_trajectory_output(file_extension[1:]): 

130 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log) 

131 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:]) 

132 return path 

133 

134 

135def check_out_str_ens_path(path, out_log, classname): 

136 """ Checks if output folder exists and format is correct """ 

137 if PurePath(path).parent and not Path(PurePath(path).parent).exists(): 

138 fu.log(classname + ': Unexisting output folder, exiting', out_log) 

139 raise SystemExit(classname + ': Unexisting output folder') 

140 file_extension = PurePath(path).suffix 

141 if not is_valid_zip(file_extension[1:]): 

142 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log) 

143 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:]) 

144 return path 

145 

146 

147def get_default_value(key): 

148 """ Gives default values according to the given key """ 

149 

150 default_values = {"instructions_file": "instructions.in", 

151 "binary_path": "gmx", 

152 "terms": ["Potential"], 

153 "selection": "System", 

154 "xvg": "none", 

155 "dista": False, 

156 "method": "linkage", 

157 "cutoff": 0.1, 

158 "cluster_selection": "System", 

159 "fit_selection": "System", 

160 "center_selection": "System", 

161 "output_selection": "System", 

162 "pbc": "mol", 

163 "center": True, 

164 "fit": "none", 

165 "ur": "compact", 

166 "skip": 1, 

167 "start": 0, 

168 "end": 0, 

169 "dt": 0, 

170 "ot_str_ens": "pdb"} 

171 

172 return default_values[key] 

173 

174 

175def get_binary_path(properties, type): 

176 """ Gets binary path """ 

177 return properties.get(type, get_default_value(type)) 

178 

179 

180def get_terms(properties, out_log, classname): 

181 """ Gets energy terms """ 

182 terms = properties.get('terms', dict()) 

183 if not terms or not isinstance(terms, list): 

184 fu.log(classname + ': No terms provided or incorrect format, exiting', out_log) 

185 raise SystemExit(classname + ': No terms provided or incorrect format') 

186 if not is_valid_term(terms): 

187 fu.log(classname + ': Incorrect terms provided, exiting', out_log) 

188 raise SystemExit(classname + ': Incorrect terms provided') 

189 return properties.get('terms', '') 

190 

191 

192def get_selection(properties, out_log, classname): 

193 """ Gets selection items """ 

194 selection = properties.get('selection', get_default_value('selection')) 

195 if not selection: 

196 fu.log(classname + ': No selection provided or incorrect format, exiting', out_log) 

197 raise SystemExit(classname + ': No selection provided or incorrect format') 

198 if not is_valid_selection(selection): 

199 fu.log(classname + ': Incorrect selection provided, exiting', out_log) 

200 raise SystemExit(classname + ': Incorrect selection provided') 

201 return selection 

202 

203 

204def get_image_selection(properties, key, out_log, classname): 

205 """ Gets selection items """ 

206 selection = properties.get(key, get_default_value(key)) 

207 if not selection: 

208 fu.log(classname + ': No selection provided or incorrect format, exiting', out_log) 

209 raise SystemExit(classname + ': No selection provided or incorrect format') 

210 if not is_valid_selection(selection): 

211 fu.log(classname + ': Incorrect selection provided, exiting', out_log) 

212 raise SystemExit(classname + ': Incorrect selection provided') 

213 return selection 

214 

215 

216def get_selection_index_file(properties, index, key, out_log, classname): 

217 """ Gets selection items from provided index file """ 

218 pattern = re.compile(r"\[.*\]") 

219 selection = [] 

220 with open(index, "r") as ndx_file: 

221 for i, line in enumerate(ndx_file): 

222 for match in re.finditer(pattern, line): 

223 selection.append(re.sub(r'[\[\] ]', '', match.group())) 

224 sel = properties.get(key, get_default_value(key)) 

225 if sel not in selection: 

226 fu.log(classname + ': Incorrect selection provided, exiting', out_log) 

227 raise SystemExit(classname + ': Incorrect selection provided') 

228 return sel 

229 

230 

231def get_pbc(properties, out_log, classname): 

232 """ Gets pbc """ 

233 pbc = properties.get('pbc', get_default_value('pbc')) 

234 if not is_valid_pbc(pbc): 

235 fu.log(classname + ': Incorrect pbc provided, exiting', out_log) 

236 raise SystemExit(classname + ': Incorrect pbc provided') 

237 return pbc 

238 

239 

240def get_center(properties, out_log, classname): 

241 """ Gets center """ 

242 center = properties.get('center', get_default_value('center')) 

243 if not is_valid_boolean(center): 

244 fu.log(classname + ': Incorrect center provided, exiting', out_log) 

245 raise SystemExit(classname + ': Incorrect center provided') 

246 return center 

247 

248 

249def get_ur(properties, out_log, classname): 

250 """ Gets ur """ 

251 ur = properties.get('ur', get_default_value('ur')) 

252 if not is_valid_ur(ur): 

253 fu.log(classname + ': Incorrect ur provided, exiting', out_log) 

254 raise SystemExit(classname + ': Incorrect ur provided') 

255 return ur 

256 

257 

258def get_fit(properties, out_log, classname): 

259 """ Gets fit """ 

260 fit = properties.get('fit', get_default_value('fit')) 

261 if not is_valid_fit(fit): 

262 fu.log(classname + ': Incorrect fit provided, exiting', out_log) 

263 raise SystemExit(classname + ': Incorrect fit provided') 

264 return fit 

265 

266 

267def get_skip(properties, out_log, classname): 

268 """ Gets skip """ 

269 skip = properties.get('skip', get_default_value('skip')) 

270 if not is_valid_int(skip): 

271 fu.log(classname + ': Incorrect skip provided, exiting', out_log) 

272 raise SystemExit(classname + ': Incorrect start provided') 

273 return str(skip) 

274 

275 

276def get_start(properties, out_log, classname): 

277 """ Gets start """ 

278 start = properties.get('start', get_default_value('start')) 

279 if not is_valid_int(start): 

280 fu.log(classname + ': Incorrect start provided, exiting', out_log) 

281 raise SystemExit(classname + ': Incorrect start provided') 

282 return str(start) 

283 

284 

285def get_end(properties, out_log, classname): 

286 """ Gets end """ 

287 end = properties.get('end', get_default_value('end')) 

288 if not is_valid_int(end): 

289 fu.log(classname + ': Incorrect end provided, exiting', out_log) 

290 raise SystemExit(classname + ': Incorrect end provided') 

291 return str(end) 

292 

293 

294def get_dt(properties, out_log, classname): 

295 """ Gets dt """ 

296 dt = properties.get('dt', get_default_value('dt')) 

297 if not is_valid_int(dt): 

298 fu.log(classname + ': Incorrect dt provided, exiting', out_log) 

299 raise SystemExit(classname + ': Incorrect dt provided') 

300 return str(dt) 

301 

302 

303def get_ot_str_ens(properties, out_log, classname): 

304 """ Gets output type """ 

305 output_type = properties.get('output_type', get_default_value('ot_str_ens')) 

306 if not is_valid_ot_str_ens(output_type): 

307 fu.log(classname + ': Incorrect output_type provided, exiting', out_log) 

308 raise SystemExit(classname + ': Incorrect output_type provided') 

309 return str(output_type) 

310 

311 

312def get_xvg(properties, out_log, classname): 

313 """ Gets xvg """ 

314 xvg = properties.get('xvg', get_default_value('xvg')) 

315 if not is_valid_xvg_param(xvg): 

316 fu.log(classname + ': Incorrect xvg provided, exiting', out_log) 

317 raise SystemExit(classname + ': Incorrect xvg provided') 

318 return xvg 

319 

320 

321def get_dista(properties, out_log, classname): 

322 """ Gets dista """ 

323 dista = properties.get('dista', get_default_value('dista')) 

324 if not is_valid_boolean(dista): 

325 fu.log(classname + ': Incorrect dista provided, exiting', out_log) 

326 raise SystemExit(classname + ': Incorrect dista provided') 

327 return dista 

328 

329 

330def get_method(properties, out_log, classname): 

331 """ Gets method """ 

332 method = properties.get('method', get_default_value('method')) 

333 if not is_valid_method_param(method): 

334 fu.log(classname + ': Incorrect method provided, exiting', out_log) 

335 raise SystemExit(classname + ': Incorrect method provided') 

336 return method 

337 

338 

339def get_cutoff(properties, out_log, classname): 

340 """ Gets cutoff """ 

341 cutoff = properties.get('cutoff', get_default_value('cutoff')) 

342 if not is_valid_float(cutoff): 

343 fu.log(classname + ': Incorrect cutoff provided, exiting', out_log) 

344 raise SystemExit(classname + ': Incorrect cutoff provided') 

345 return str(cutoff) 

346 

347 

348def is_valid_boolean(val): 

349 """ Checks if given value is boolean """ 

350 values = [True, False] 

351 return val in values 

352 

353 

354def is_valid_float(val): 

355 """ Checks if given value is float """ 

356 if val and not isinstance(val, float) and not isinstance(val, int): 

357 return False 

358 return True 

359 

360 

361def is_valid_int(val): 

362 """ Checks if given value is int """ 

363 if val and not isinstance(val, int): 

364 return False 

365 return True 

366 

367 

368def is_valid_method_param(met): 

369 """ Checks if method is compatible with GROMACS """ 

370 methods = ['linkage', 'jarvis-patrick', 'monte-carlo', 'diagonalization', 'gromos'] 

371 return met in methods 

372 

373 

374def is_valid_structure(ext): 

375 """ Checks if structure format is compatible with GROMACS """ 

376 formats = ['tpr', 'gro', 'g96', 'pdb', 'brk', 'ent'] 

377 return ext in formats 

378 

379 

380def is_valid_index(ext): 

381 """ Checks if structure format is compatible with GROMACS """ 

382 formats = ['ndx'] 

383 return ext in formats 

384 

385 

386def is_valid_trajectory(ext): 

387 """ Checks if trajectory format is compatible with GROMACS """ 

388 formats = ['xtc', 'trr', 'cpt', 'gro', 'g96', 'pdb', 'tng'] 

389 return ext in formats 

390 

391 

392def is_valid_trajectory_output(ext): 

393 """ Checks if trajectory format is compatible with GROMACS """ 

394 formats = ['xtc', 'trr', 'gro', 'g96', 'pdb', 'tng'] 

395 return ext in formats 

396 

397 

398def is_valid_energy(ext): 

399 """ Checks if energy format is compatible with GROMACS """ 

400 formats = ['edr'] 

401 return ext in formats 

402 

403 

404def is_valid_xvg(ext): 

405 """ Checks if file is XVG """ 

406 formats = ['xvg'] 

407 return ext in formats 

408 

409 

410def is_valid_zip(ext): 

411 """ Checks if file is ZIP """ 

412 formats = ['zip'] 

413 return ext in formats 

414 

415 

416def is_valid_xvg_param(ext): 

417 """ Checks xvg parameter """ 

418 formats = ['xmgrace', 'xmgr', 'none'] 

419 return ext in formats 

420 

421 

422def is_valid_ot_str_ens(ext): 

423 """ Checks if output type for structure ensemble is correct """ 

424 formats = ['gro', 'g96', 'pdb'] 

425 return ext in formats 

426 

427 

428def is_valid_pbc(pbc): 

429 """ Checks pbc parameter """ 

430 values = ['none', 'mol', 'res', 'atom', 'nojump', 'cluster', 'whole'] 

431 return pbc in values 

432 

433 

434def is_valid_ur(ur): 

435 """ Checks ur parameter """ 

436 values = ['rect', 'tric', 'compact'] 

437 return ur in values 

438 

439 

440def is_valid_fit(fit): 

441 """ Checks fit parameter """ 

442 values = ['none', 'rot+trans', 'rotxy+transxy', 'translation', 'transxy', 'progressive'] 

443 return fit in values 

444 

445 

446def is_valid_term(iterms): 

447 """ Checks if term is correct """ 

448 cterms = ['Angle', 'Proper-Dih.', 'Improper-Dih.', 'LJ-14', 'Coulomb-14', 'LJ-(SR)', 'Coulomb-(SR)', 'Coul.-recip.', 'Position-Rest.', 'Potential', 'Kinetic-En.', 'Total-Energy', 'Temperature', 'Pressure', ' Constr.-rmsd', 'Box-X', 'Box-Y', ' Box-Z', 'Volume', 'Density', 'pV', 'Enthalpy', 'Vir-XX', 'Vir-XY', 'Vir-XZ', 'Vir-YX', 'Vir-YY', 'Vir-YZ', 'Vir-ZX', 'Vir-ZY', 'Vir-ZZ', 'Pres-XX', 'Pres-XY', 'Pres-XZ', 'Pres-YX', 'Pres-YY', 'Pres-YZ', 'Pres-ZX', 'Pres-ZY', 'Pres-ZZ', '#Surf*SurfTen', 'Box-Vel-XX', 'Box-Vel-YY', 'Box-Vel-ZZ', 'Mu-X', 'Mu-Y', 'Mu-Z', 'T-Protein', 'T-non-Protein', 'Lamb-Protein', 'Lamb-non-Protein'] 

449 return all(elem in cterms for elem in iterms) 

450 

451 

452def is_valid_selection(ext): 

453 """ Checks if selection is correct """ 

454 formats = ['System', 'Protein', 'Protein-H', 'C-alpha', 'Backbone', 'MainChain', 'MainChain+Cb', 'MainChain+H', 'SideChain', 'SideChain-H', 'Prot-Masses', 'non-Protein', 'Water', 'SOL', 'non-Water', 'Ion', 'NA', 'CL', 'Water_and_ions', 'DNA', 'RNA', 'Protein_DNA', 'Protein_RNA', 'Protein_DNA_RNA', 'DNA_RNA'] 

455 return ext in formats 

456 

457 

458def copy_instructions_file_to_container(instructions_file, unique_dir): 

459 shutil.copy2(instructions_file, unique_dir) 

460 

461 

462def remove_tmp_files(list, remove_tmp, out_log): 

463 """ Removes temporal files generated by the wrapper """ 

464 if remove_tmp: 

465 tmp_files = list 

466 removed_files = [f for f in tmp_files if fu.rm(f)] 

467 fu.log('Removed: %s' % str(removed_files), out_log) 

468 

469 

470def process_output_trjconv_str_ens(tmp_folder, output_file, output_dir, glob_pattern, out_log): 

471 tmp_fl = list(Path(tmp_folder).glob(glob_pattern)) 

472 files_list = [] 

473 for file_name in tmp_fl: 

474 files_list.append(file_name) 

475 

476 # adding files from temporary folder to zip 

477 fu.zip_list(output_file, files_list, out_log) 

478 

479 shutil.copy2(output_file, output_dir)