Coverage for biobb_analysis/gromacs/common.py: 61%
303 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 15:22 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-06 15:22 +0000
1""" Common functions for package biobb_analysis.gromacs """
2from pathlib import Path, PurePath
3import re
4import shutil
5from biobb_common.tools import file_utils as fu
6from biobb_common.command_wrapper import cmd_wrapper
9def gmx_check(file_a: str, file_b: str, gmx: str = 'gmx') -> bool:
10 print("Comparing GROMACS files:")
11 print("FILE_A: %s" % str(Path(file_a).resolve()))
12 print("FILE_B: %s" % str(Path(file_b).resolve()))
13 check_result = 'check_result.out'
14 cmd = [gmx, 'check']
15 if file_a.endswith(".tpr"):
16 cmd.append('-s1')
17 else:
18 cmd.append('-f')
19 cmd.append(file_a)
20 if file_b.endswith(".tpr"):
21 cmd.append('-s2')
22 else:
23 cmd.append('-f2')
24 cmd.append(file_b)
25 cmd.append('> check_result.out')
26 cmd_wrapper.CmdWrapper(cmd).launch()
27 print("Result file: %s" % str(Path(check_result).resolve()))
28 with open(check_result) as check_file:
29 for line_num, line in enumerate(check_file):
30 if not line.rstrip():
31 continue
32 if line.startswith("Both files read correctly"):
33 continue
34 if not line.startswith('comparing'):
35 print('Discrepance found in line %d: %s' % (line_num, line))
36 return False
37 return True
40def check_energy_path(path, out_log, classname):
41 """ Checks energy input file """
42 if not Path(path).exists():
43 fu.log(classname + ': Unexisting energy input file, exiting', out_log)
44 raise SystemExit(classname + ': Unexisting energy input file')
45 file_extension = PurePath(path).suffix
46 if not is_valid_energy(file_extension[1:]):
47 fu.log(classname + ': Format %s in energy input file is not compatible' % file_extension[1:], out_log)
48 raise SystemExit(classname + ': Format %s in energy input file is not compatible' % file_extension[1:])
49 # if file input has no path, add cwd because execution is launched on tmp folder
50 if (PurePath(path).name == path or not PurePath(path).is_absolute()):
51 path = str(PurePath(Path.cwd()).joinpath(path))
52 return path
55def check_input_path(path, out_log, classname):
56 """ Checks input structure file """
57 if not Path(path).exists():
58 fu.log(classname + ': Unexisting structure input file, exiting', out_log)
59 raise SystemExit(classname + ': Unexisting structure input file')
60 file_extension = PurePath(path).suffix
61 if not is_valid_structure(file_extension[1:]):
62 fu.log(classname + ': Format %s in structure input file is not compatible' % file_extension[1:], out_log)
63 raise SystemExit(classname + ': Format %s in structure input file is not compatible' % file_extension[1:])
64 # if file input has no path, add cwd because execution is launched on tmp folder
65 if (PurePath(path).name == path or not PurePath(path).is_absolute()):
66 path = str(PurePath(Path.cwd()).joinpath(path))
67 return path
70def check_index_path(path, out_log, classname):
71 """ Checks index input file """
72 if not path:
73 return None
74 file_extension = PurePath(path).suffix
75 if not is_valid_index(file_extension[1:]):
76 fu.log(classname + ': Format %s in index input file is not compatible' % file_extension[1:], out_log)
77 raise SystemExit(classname + ': Format %s in index input file is not compatible' % file_extension[1:])
78 # if file input has no path, add cwd because execution is launched on tmp folder
79 if (PurePath(path).name == path or not PurePath(path).is_absolute()):
80 path = str(PurePath(Path.cwd()).joinpath(path))
81 return path
84def check_traj_path(path, out_log, classname):
85 """ Checks input structure file """
86 if not Path(path).exists():
87 fu.log(classname + ': Unexisting trajectory input file, exiting', out_log)
88 raise SystemExit(classname + ': Unexisting trajectory input file')
89 file_extension = PurePath(path).suffix
90 if not is_valid_trajectory(file_extension[1:]):
91 fu.log(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:], out_log)
92 raise SystemExit(classname + ': Format %s in trajectory input file is not compatible' % file_extension[1:])
93 # if file input has no path, add cwd because execution is launched on tmp folder
94 if (PurePath(path).name == path or not PurePath(path).is_absolute()):
95 path = str(PurePath(Path.cwd()).joinpath(path))
96 return path
99def check_out_xvg_path(path, out_log, classname):
100 """ Checks if output folder exists and format is xvg """
101 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
102 fu.log(classname + ': Unexisting output folder, exiting', out_log)
103 raise SystemExit(classname + ': Unexisting output folder')
104 file_extension = PurePath(path).suffix
105 if not is_valid_xvg(file_extension[1:]):
106 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log)
107 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:])
108 return path
111def check_out_pdb_path(path, out_log, classname):
112 """ Checks if output folder exists and format is xvg """
113 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
114 fu.log(classname + ': Unexisting output folder, exiting', out_log)
115 raise SystemExit(classname + ': Unexisting output folder')
116 file_extension = PurePath(path).suffix
117 if not is_valid_structure(file_extension[1:]):
118 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log)
119 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:])
120 return path
123def check_out_traj_path(path, out_log, classname):
124 """ Checks if output folder exists and format is correct """
125 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
126 fu.log(classname + ': Unexisting output folder, exiting', out_log)
127 raise SystemExit(classname + ': Unexisting output folder')
128 file_extension = PurePath(path).suffix
129 if not is_valid_trajectory_output(file_extension[1:]):
130 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log)
131 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:])
132 return path
135def check_out_str_ens_path(path, out_log, classname):
136 """ Checks if output folder exists and format is correct """
137 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
138 fu.log(classname + ': Unexisting output folder, exiting', out_log)
139 raise SystemExit(classname + ': Unexisting output folder')
140 file_extension = PurePath(path).suffix
141 if not is_valid_zip(file_extension[1:]):
142 fu.log(classname + ': Format %s in output file is not compatible' % file_extension[1:], out_log)
143 raise SystemExit(classname + ': Format %s in output file is not compatible' % file_extension[1:])
144 return path
147def get_default_value(key):
148 """ Gives default values according to the given key """
150 default_values = {"instructions_file": "instructions.in",
151 "binary_path": "gmx",
152 "terms": ["Potential"],
153 "selection": "System",
154 "xvg": "none",
155 "dista": False,
156 "method": "linkage",
157 "cutoff": 0.1,
158 "cluster_selection": "System",
159 "fit_selection": "System",
160 "center_selection": "System",
161 "output_selection": "System",
162 "pbc": "mol",
163 "center": True,
164 "fit": "none",
165 "ur": "compact",
166 "skip": 1,
167 "start": 0,
168 "end": 0,
169 "dt": 0,
170 "ot_str_ens": "pdb"}
172 return default_values[key]
175def get_binary_path(properties, type):
176 """ Gets binary path """
177 return properties.get(type, get_default_value(type))
180def get_terms(properties, out_log, classname):
181 """ Gets energy terms """
182 terms = properties.get('terms', dict())
183 if not terms or not isinstance(terms, list):
184 fu.log(classname + ': No terms provided or incorrect format, exiting', out_log)
185 raise SystemExit(classname + ': No terms provided or incorrect format')
186 if not is_valid_term(terms):
187 fu.log(classname + ': Incorrect terms provided, exiting', out_log)
188 raise SystemExit(classname + ': Incorrect terms provided')
189 return properties.get('terms', '')
192def get_selection(properties, out_log, classname):
193 """ Gets selection items """
194 selection = properties.get('selection', get_default_value('selection'))
195 if not selection:
196 fu.log(classname + ': No selection provided or incorrect format, exiting', out_log)
197 raise SystemExit(classname + ': No selection provided or incorrect format')
198 if not is_valid_selection(selection):
199 fu.log(classname + ': Incorrect selection provided, exiting', out_log)
200 raise SystemExit(classname + ': Incorrect selection provided')
201 return selection
204def get_image_selection(properties, key, out_log, classname):
205 """ Gets selection items """
206 selection = properties.get(key, get_default_value(key))
207 if not selection:
208 fu.log(classname + ': No selection provided or incorrect format, exiting', out_log)
209 raise SystemExit(classname + ': No selection provided or incorrect format')
210 if not is_valid_selection(selection):
211 fu.log(classname + ': Incorrect selection provided, exiting', out_log)
212 raise SystemExit(classname + ': Incorrect selection provided')
213 return selection
216def get_selection_index_file(properties, index, key, out_log, classname):
217 """ Gets selection items from provided index file """
218 pattern = re.compile(r"\[.*\]")
219 selection = []
220 with open(index, "r") as ndx_file:
221 for i, line in enumerate(ndx_file):
222 for match in re.finditer(pattern, line):
223 selection.append(re.sub(r'[\[\] ]', '', match.group()))
224 sel = properties.get(key, get_default_value(key))
225 if sel not in selection:
226 fu.log(classname + ': Incorrect selection provided, exiting', out_log)
227 raise SystemExit(classname + ': Incorrect selection provided')
228 return sel
231def get_pbc(properties, out_log, classname):
232 """ Gets pbc """
233 pbc = properties.get('pbc', get_default_value('pbc'))
234 if not is_valid_pbc(pbc):
235 fu.log(classname + ': Incorrect pbc provided, exiting', out_log)
236 raise SystemExit(classname + ': Incorrect pbc provided')
237 return pbc
240def get_center(properties, out_log, classname):
241 """ Gets center """
242 center = properties.get('center', get_default_value('center'))
243 if not is_valid_boolean(center):
244 fu.log(classname + ': Incorrect center provided, exiting', out_log)
245 raise SystemExit(classname + ': Incorrect center provided')
246 return center
249def get_ur(properties, out_log, classname):
250 """ Gets ur """
251 ur = properties.get('ur', get_default_value('ur'))
252 if not is_valid_ur(ur):
253 fu.log(classname + ': Incorrect ur provided, exiting', out_log)
254 raise SystemExit(classname + ': Incorrect ur provided')
255 return ur
258def get_fit(properties, out_log, classname):
259 """ Gets fit """
260 fit = properties.get('fit', get_default_value('fit'))
261 if not is_valid_fit(fit):
262 fu.log(classname + ': Incorrect fit provided, exiting', out_log)
263 raise SystemExit(classname + ': Incorrect fit provided')
264 return fit
267def get_skip(properties, out_log, classname):
268 """ Gets skip """
269 skip = properties.get('skip', get_default_value('skip'))
270 if not is_valid_int(skip):
271 fu.log(classname + ': Incorrect skip provided, exiting', out_log)
272 raise SystemExit(classname + ': Incorrect start provided')
273 return str(skip)
276def get_start(properties, out_log, classname):
277 """ Gets start """
278 start = properties.get('start', get_default_value('start'))
279 if not is_valid_int(start):
280 fu.log(classname + ': Incorrect start provided, exiting', out_log)
281 raise SystemExit(classname + ': Incorrect start provided')
282 return str(start)
285def get_end(properties, out_log, classname):
286 """ Gets end """
287 end = properties.get('end', get_default_value('end'))
288 if not is_valid_int(end):
289 fu.log(classname + ': Incorrect end provided, exiting', out_log)
290 raise SystemExit(classname + ': Incorrect end provided')
291 return str(end)
294def get_dt(properties, out_log, classname):
295 """ Gets dt """
296 dt = properties.get('dt', get_default_value('dt'))
297 if not is_valid_int(dt):
298 fu.log(classname + ': Incorrect dt provided, exiting', out_log)
299 raise SystemExit(classname + ': Incorrect dt provided')
300 return str(dt)
303def get_ot_str_ens(properties, out_log, classname):
304 """ Gets output type """
305 output_type = properties.get('output_type', get_default_value('ot_str_ens'))
306 if not is_valid_ot_str_ens(output_type):
307 fu.log(classname + ': Incorrect output_type provided, exiting', out_log)
308 raise SystemExit(classname + ': Incorrect output_type provided')
309 return str(output_type)
312def get_xvg(properties, out_log, classname):
313 """ Gets xvg """
314 xvg = properties.get('xvg', get_default_value('xvg'))
315 if not is_valid_xvg_param(xvg):
316 fu.log(classname + ': Incorrect xvg provided, exiting', out_log)
317 raise SystemExit(classname + ': Incorrect xvg provided')
318 return xvg
321def get_dista(properties, out_log, classname):
322 """ Gets dista """
323 dista = properties.get('dista', get_default_value('dista'))
324 if not is_valid_boolean(dista):
325 fu.log(classname + ': Incorrect dista provided, exiting', out_log)
326 raise SystemExit(classname + ': Incorrect dista provided')
327 return dista
330def get_method(properties, out_log, classname):
331 """ Gets method """
332 method = properties.get('method', get_default_value('method'))
333 if not is_valid_method_param(method):
334 fu.log(classname + ': Incorrect method provided, exiting', out_log)
335 raise SystemExit(classname + ': Incorrect method provided')
336 return method
339def get_cutoff(properties, out_log, classname):
340 """ Gets cutoff """
341 cutoff = properties.get('cutoff', get_default_value('cutoff'))
342 if not is_valid_float(cutoff):
343 fu.log(classname + ': Incorrect cutoff provided, exiting', out_log)
344 raise SystemExit(classname + ': Incorrect cutoff provided')
345 return str(cutoff)
348def is_valid_boolean(val):
349 """ Checks if given value is boolean """
350 values = [True, False]
351 return val in values
354def is_valid_float(val):
355 """ Checks if given value is float """
356 if val and not isinstance(val, float) and not isinstance(val, int):
357 return False
358 return True
361def is_valid_int(val):
362 """ Checks if given value is int """
363 if val and not isinstance(val, int):
364 return False
365 return True
368def is_valid_method_param(met):
369 """ Checks if method is compatible with GROMACS """
370 methods = ['linkage', 'jarvis-patrick', 'monte-carlo', 'diagonalization', 'gromos']
371 return met in methods
374def is_valid_structure(ext):
375 """ Checks if structure format is compatible with GROMACS """
376 formats = ['tpr', 'gro', 'g96', 'pdb', 'brk', 'ent']
377 return ext in formats
380def is_valid_index(ext):
381 """ Checks if structure format is compatible with GROMACS """
382 formats = ['ndx']
383 return ext in formats
386def is_valid_trajectory(ext):
387 """ Checks if trajectory format is compatible with GROMACS """
388 formats = ['xtc', 'trr', 'cpt', 'gro', 'g96', 'pdb', 'tng']
389 return ext in formats
392def is_valid_trajectory_output(ext):
393 """ Checks if trajectory format is compatible with GROMACS """
394 formats = ['xtc', 'trr', 'gro', 'g96', 'pdb', 'tng']
395 return ext in formats
398def is_valid_energy(ext):
399 """ Checks if energy format is compatible with GROMACS """
400 formats = ['edr']
401 return ext in formats
404def is_valid_xvg(ext):
405 """ Checks if file is XVG """
406 formats = ['xvg']
407 return ext in formats
410def is_valid_zip(ext):
411 """ Checks if file is ZIP """
412 formats = ['zip']
413 return ext in formats
416def is_valid_xvg_param(ext):
417 """ Checks xvg parameter """
418 formats = ['xmgrace', 'xmgr', 'none']
419 return ext in formats
422def is_valid_ot_str_ens(ext):
423 """ Checks if output type for structure ensemble is correct """
424 formats = ['gro', 'g96', 'pdb']
425 return ext in formats
428def is_valid_pbc(pbc):
429 """ Checks pbc parameter """
430 values = ['none', 'mol', 'res', 'atom', 'nojump', 'cluster', 'whole']
431 return pbc in values
434def is_valid_ur(ur):
435 """ Checks ur parameter """
436 values = ['rect', 'tric', 'compact']
437 return ur in values
440def is_valid_fit(fit):
441 """ Checks fit parameter """
442 values = ['none', 'rot+trans', 'rotxy+transxy', 'translation', 'transxy', 'progressive']
443 return fit in values
446def is_valid_term(iterms):
447 """ Checks if term is correct """
448 cterms = ['Angle', 'Proper-Dih.', 'Improper-Dih.', 'LJ-14', 'Coulomb-14', 'LJ-(SR)', 'Coulomb-(SR)', 'Coul.-recip.', 'Position-Rest.', 'Potential', 'Kinetic-En.', 'Total-Energy', 'Temperature', 'Pressure', ' Constr.-rmsd', 'Box-X', 'Box-Y', ' Box-Z', 'Volume', 'Density', 'pV', 'Enthalpy', 'Vir-XX', 'Vir-XY', 'Vir-XZ', 'Vir-YX', 'Vir-YY', 'Vir-YZ', 'Vir-ZX', 'Vir-ZY', 'Vir-ZZ', 'Pres-XX', 'Pres-XY', 'Pres-XZ', 'Pres-YX', 'Pres-YY', 'Pres-YZ', 'Pres-ZX', 'Pres-ZY', 'Pres-ZZ', '#Surf*SurfTen', 'Box-Vel-XX', 'Box-Vel-YY', 'Box-Vel-ZZ', 'Mu-X', 'Mu-Y', 'Mu-Z', 'T-Protein', 'T-non-Protein', 'Lamb-Protein', 'Lamb-non-Protein']
449 return all(elem in cterms for elem in iterms)
452def is_valid_selection(ext):
453 """ Checks if selection is correct """
454 formats = ['System', 'Protein', 'Protein-H', 'C-alpha', 'Backbone', 'MainChain', 'MainChain+Cb', 'MainChain+H', 'SideChain', 'SideChain-H', 'Prot-Masses', 'non-Protein', 'Water', 'SOL', 'non-Water', 'Ion', 'NA', 'CL', 'Water_and_ions', 'DNA', 'RNA', 'Protein_DNA', 'Protein_RNA', 'Protein_DNA_RNA', 'DNA_RNA']
455 return ext in formats
458def copy_instructions_file_to_container(instructions_file, unique_dir):
459 shutil.copy2(instructions_file, unique_dir)
462def remove_tmp_files(list, remove_tmp, out_log):
463 """ Removes temporal files generated by the wrapper """
464 if remove_tmp:
465 tmp_files = list
466 removed_files = [f for f in tmp_files if fu.rm(f)]
467 fu.log('Removed: %s' % str(removed_files), out_log)
470def process_output_trjconv_str_ens(tmp_folder, output_file, output_dir, glob_pattern, out_log):
471 tmp_fl = list(Path(tmp_folder).glob(glob_pattern))
472 files_list = []
473 for file_name in tmp_fl:
474 files_list.append(file_name)
476 # adding files from temporary folder to zip
477 fu.zip_list(output_file, files_list, out_log)
479 shutil.copy2(output_file, output_dir)