Coverage for biobb_analysis/gromacs/common.py: 60%
321 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-08 08:07 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-08 08:07 +0000
1"""Common functions for package biobb_analysis.gromacs"""
3import re
4import shutil
5from pathlib import Path, PurePath
6from typing import Optional, Union
8from biobb_common.command_wrapper import cmd_wrapper
9from biobb_common.tools import file_utils as fu
12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool:
13 print("Comparing GROMACS files:")
14 print("FILE_A: %s" % str(Path(file_a).resolve()))
15 print("FILE_B: %s" % str(Path(file_b).resolve()))
16 check_result = "check_result.out"
17 cmd = [gmx, "check"]
18 if file_a.endswith(".tpr"):
19 cmd.append("-s1")
20 else:
21 cmd.append("-f")
22 cmd.append(file_a)
23 if file_b.endswith(".tpr"):
24 cmd.append("-s2")
25 else:
26 cmd.append("-f2")
27 cmd.append(file_b)
28 cmd.append("> check_result.out")
29 cmd_wrapper.CmdWrapper(cmd).launch()
30 print("Result file: %s" % str(Path(check_result).resolve()))
31 with open(check_result) as check_file:
32 for line_num, line in enumerate(check_file):
33 if not line.rstrip():
34 continue
35 if line.startswith("Both files read correctly"):
36 continue
37 if not line.startswith("comparing"):
38 print("Discrepance found in line %d: %s" % (line_num, line))
39 return False
40 return True
43def check_energy_path(path, out_log, classname):
44 """Checks energy input file"""
45 if not Path(path).exists():
46 fu.log(classname + ": Unexisting energy input file, exiting", out_log)
47 raise SystemExit(classname + ": Unexisting energy input file")
48 file_extension = PurePath(path).suffix
49 if not is_valid_energy(file_extension[1:]):
50 fu.log(
51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:],
52 out_log,
53 )
54 raise SystemExit(
55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:]
56 )
57 # if file input has no path, add cwd because execution is launched on tmp folder
58 if PurePath(path).name == path or not PurePath(path).is_absolute():
59 path = str(PurePath(Path.cwd()).joinpath(path))
60 return path
63def check_input_path(path, out_log, classname):
64 """Checks input structure file"""
65 if not Path(path).exists():
66 fu.log(classname + ": Unexisting structure input file, exiting", out_log)
67 raise SystemExit(classname + ": Unexisting structure input file")
68 file_extension = PurePath(path).suffix
69 if not is_valid_structure(file_extension[1:]):
70 fu.log(
71 classname + ": Format %s in structure input file is not compatible"
72 % file_extension[1:],
73 out_log,
74 )
75 raise SystemExit(
76 classname + ": Format %s in structure input file is not compatible"
77 % file_extension[1:]
78 )
79 # if file input has no path, add cwd because execution is launched on tmp folder
80 if PurePath(path).name == path or not PurePath(path).is_absolute():
81 path = str(PurePath(Path.cwd()).joinpath(path))
82 return path
85def check_index_path(path, out_log, classname):
86 """Checks index input file"""
87 if not path:
88 return None
89 file_extension = PurePath(path).suffix
90 if not is_valid_index(file_extension[1:]):
91 fu.log(
92 classname + ": Format %s in index input file is not compatible" % file_extension[1:],
93 out_log,
94 )
95 raise SystemExit(
96 classname + ": Format %s in index input file is not compatible" % file_extension[1:]
97 )
98 # if file input has no path, add cwd because execution is launched on tmp folder
99 if PurePath(path).name == path or not PurePath(path).is_absolute():
100 path = str(PurePath(Path.cwd()).joinpath(path))
101 return path
104def check_traj_path(path, out_log, classname):
105 """Checks input structure file"""
106 if not Path(path).exists():
107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log)
108 raise SystemExit(classname + ": Unexisting trajectory input file")
109 file_extension = PurePath(path).suffix
110 if not is_valid_trajectory(file_extension[1:]):
111 fu.log(
112 classname + ": Format %s in trajectory input file is not compatible"
113 % file_extension[1:],
114 out_log,
115 )
116 raise SystemExit(
117 classname + ": Format %s in trajectory input file is not compatible"
118 % file_extension[1:]
119 )
120 # if file input has no path, add cwd because execution is launched on tmp folder
121 if PurePath(path).name == path or not PurePath(path).is_absolute():
122 path = str(PurePath(Path.cwd()).joinpath(path))
123 return path
126def check_out_xvg_path(path, out_log, classname):
127 """Checks if output folder exists and format is xvg"""
128 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
129 fu.log(classname + ": Unexisting output folder, exiting", out_log)
130 raise SystemExit(classname + ": Unexisting output folder")
131 file_extension = PurePath(path).suffix
132 if not is_valid_xvg(file_extension[1:]):
133 fu.log(
134 classname + ": Format %s in output file is not compatible" % file_extension[1:],
135 out_log,
136 )
137 raise SystemExit(
138 classname + ": Format %s in output file is not compatible" % file_extension[1:]
139 )
140 return path
143def check_out_pdb_path(path, out_log, classname):
144 """Checks if output folder exists and format is xvg"""
145 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
146 fu.log(classname + ": Unexisting output folder, exiting", out_log)
147 raise SystemExit(classname + ": Unexisting output folder")
148 file_extension = PurePath(path).suffix
149 if not is_valid_structure(file_extension[1:]):
150 fu.log(
151 classname + ": Format %s in output file is not compatible" % file_extension[1:],
152 out_log,
153 )
154 raise SystemExit(
155 classname + ": Format %s in output file is not compatible" % file_extension[1:]
156 )
157 return path
160def check_out_traj_path(path, out_log, classname):
161 """Checks if output folder exists and format is correct"""
162 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
163 fu.log(classname + ": Unexisting output folder, exiting", out_log)
164 raise SystemExit(classname + ": Unexisting output folder")
165 file_extension = PurePath(path).suffix
166 if not is_valid_trajectory_output(file_extension[1:]):
167 fu.log(
168 classname + ": Format %s in output file is not compatible" % file_extension[1:],
169 out_log,
170 )
171 raise SystemExit(
172 classname + ": Format %s in output file is not compatible" % file_extension[1:]
173 )
174 return path
177def check_out_str_ens_path(path, out_log, classname):
178 """Checks if output folder exists and format is correct"""
179 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
180 fu.log(classname + ": Unexisting output folder, exiting", out_log)
181 raise SystemExit(classname + ": Unexisting output folder")
182 file_extension = PurePath(path).suffix
183 if not is_valid_zip(file_extension[1:]):
184 fu.log(
185 classname + ": Format %s in output file is not compatible" % file_extension[1:],
186 out_log,
187 )
188 raise SystemExit(
189 classname + ": Format %s in output file is not compatible" % file_extension[1:]
190 )
191 return path
194def get_default_value(key):
195 """Gives default values according to the given key"""
197 default_values = {
198 "instructions_file": "instructions.in",
199 "binary_path": "gmx",
200 "terms": ["Potential"],
201 "selection": "System",
202 "xvg": "none",
203 "dista": False,
204 "method": "linkage",
205 "cutoff": 0.1,
206 "cluster_selection": "System",
207 "fit_selection": "System",
208 "center_selection": "System",
209 "output_selection": "System",
210 "pbc": "mol",
211 "center": True,
212 "fit": "none",
213 "ur": "compact",
214 "skip": 1,
215 "start": None,
216 "end": None,
217 "dt": None,
218 "ot_str_ens": "pdb",
219 }
221 return default_values[key]
224def get_binary_path(properties, type):
225 """Gets binary path"""
226 return properties.get(type, get_default_value(type))
229def get_terms(properties, out_log, classname):
230 """Gets energy terms"""
231 terms = properties.get("terms", dict())
232 if not terms or not isinstance(terms, list):
233 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log)
234 raise SystemExit(classname + ": No terms provided or incorrect format")
235 if not is_valid_term(terms):
236 fu.log(classname + ": Incorrect terms provided, exiting", out_log)
237 raise SystemExit(classname + ": Incorrect terms provided")
238 return properties.get("terms", "")
241def get_selection(properties, out_log, classname):
242 """Gets selection items"""
243 selection = properties.get("selection", get_default_value("selection"))
244 if not selection:
245 fu.log(
246 classname + ": No selection provided or incorrect format, exiting", out_log
247 )
248 raise SystemExit(classname + ": No selection provided or incorrect format")
249 if not is_valid_selection(selection):
250 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
251 raise SystemExit(classname + ": Incorrect selection provided")
252 return selection
255def get_image_selection(properties, key, out_log, classname):
256 """Gets selection items"""
257 selection = properties.get(key, get_default_value(key))
258 if not selection:
259 fu.log(
260 classname + ": No selection provided or incorrect format, exiting", out_log
261 )
262 raise SystemExit(classname + ": No selection provided or incorrect format")
263 if not is_valid_selection(selection):
264 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
265 raise SystemExit(classname + ": Incorrect selection provided")
266 return selection
269def get_selection_index_file(properties, index, key, out_log, classname):
270 """Gets selection items from provided index file"""
271 pattern = re.compile(r"\[.*\]")
272 selection = []
273 with open(index, "r") as ndx_file:
274 for i, line in enumerate(ndx_file):
275 for match in re.finditer(pattern, line):
276 selection.append(re.sub(r"[\[\] ]", "", match.group()))
277 sel = properties.get(key, get_default_value(key))
278 if sel not in selection:
279 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
280 raise SystemExit(classname + ": Incorrect selection provided")
281 return sel
284def get_pbc(properties, out_log, classname):
285 """Gets pbc"""
286 pbc = properties.get("pbc", get_default_value("pbc"))
287 if not is_valid_pbc(pbc):
288 fu.log(classname + ": Incorrect pbc provided, exiting", out_log)
289 raise SystemExit(classname + ": Incorrect pbc provided")
290 return pbc
293def get_center(properties, out_log, classname):
294 """Gets center"""
295 center = properties.get("center", get_default_value("center"))
296 if not is_valid_boolean(center):
297 fu.log(classname + ": Incorrect center provided, exiting", out_log)
298 raise SystemExit(classname + ": Incorrect center provided")
299 return center
302def get_ur(properties, out_log, classname):
303 """Gets ur"""
304 ur = properties.get("ur", get_default_value("ur"))
305 if not is_valid_ur(ur):
306 fu.log(classname + ": Incorrect ur provided, exiting", out_log)
307 raise SystemExit(classname + ": Incorrect ur provided")
308 return ur
311def get_fit(properties, out_log, classname):
312 """Gets fit"""
313 fit = properties.get("fit", get_default_value("fit"))
314 if not is_valid_fit(fit):
315 fu.log(classname + ": Incorrect fit provided, exiting", out_log)
316 raise SystemExit(classname + ": Incorrect fit provided")
317 return fit
320def get_skip(properties, out_log, classname):
321 """Gets skip"""
322 skip = properties.get("skip", get_default_value("skip"))
323 if not is_valid_int(skip):
324 fu.log(classname + ": Incorrect skip provided, exiting", out_log)
325 raise SystemExit(classname + ": Incorrect start provided")
326 return str(skip)
329def get_start(properties, out_log, classname):
330 """Gets start"""
331 start = properties.get("start", get_default_value("start"))
333 if start is None:
334 return start
335 if not is_valid_int(start):
336 fu.log(classname + ": Incorrect start provided, exiting", out_log)
337 raise SystemExit(classname + ": Incorrect start provided")
338 return str(start)
341def get_end(properties, out_log, classname):
342 """Gets end"""
343 end = properties.get("end", get_default_value("end"))
344 if end is None:
345 return end
346 if not is_valid_int(end):
347 fu.log(classname + ": Incorrect end provided, exiting", out_log)
348 raise SystemExit(classname + ": Incorrect end provided")
349 return str(end)
352def get_dt(properties, out_log, classname):
353 """Gets dt"""
354 dt = properties.get("dt", get_default_value("dt"))
355 if dt is None:
356 return dt
357 if not is_valid_int(dt):
358 fu.log(classname + ": Incorrect dt provided, exiting", out_log)
359 raise SystemExit(classname + ": Incorrect dt provided")
360 return str(dt)
363def get_ot_str_ens(properties, out_log, classname):
364 """Gets output type"""
365 output_type = properties.get("output_type", get_default_value("ot_str_ens"))
366 if not is_valid_ot_str_ens(output_type):
367 fu.log(classname + ": Incorrect output_type provided, exiting", out_log)
368 raise SystemExit(classname + ": Incorrect output_type provided")
369 return str(output_type)
372def get_xvg(properties, out_log, classname):
373 """Gets xvg"""
374 xvg = properties.get("xvg", get_default_value("xvg"))
375 if not is_valid_xvg_param(xvg):
376 fu.log(classname + ": Incorrect xvg provided, exiting", out_log)
377 raise SystemExit(classname + ": Incorrect xvg provided")
378 return xvg
381def get_dista(properties, out_log, classname):
382 """Gets dista"""
383 dista = properties.get("dista", get_default_value("dista"))
384 if not is_valid_boolean(dista):
385 fu.log(classname + ": Incorrect dista provided, exiting", out_log)
386 raise SystemExit(classname + ": Incorrect dista provided")
387 return dista
390def get_method(properties, out_log, classname):
391 """Gets method"""
392 method = properties.get("method", get_default_value("method"))
393 if not is_valid_method_param(method):
394 fu.log(classname + ": Incorrect method provided, exiting", out_log)
395 raise SystemExit(classname + ": Incorrect method provided")
396 return method
399def get_cutoff(properties, out_log, classname):
400 """Gets cutoff"""
401 cutoff = properties.get("cutoff", get_default_value("cutoff"))
402 if not is_valid_float(cutoff):
403 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log)
404 raise SystemExit(classname + ": Incorrect cutoff provided")
405 return str(cutoff)
408def is_valid_boolean(val):
409 """Checks if given value is boolean"""
410 values = [True, False]
411 return val in values
414def is_valid_float(val):
415 """Checks if given value is float"""
416 if val and not isinstance(val, float) and not isinstance(val, int):
417 return False
418 return True
421def is_valid_int(val):
422 """Checks if given value is int"""
423 if val and not isinstance(val, int):
424 return False
425 return True
428def is_valid_method_param(met):
429 """Checks if method is compatible with GROMACS"""
430 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"]
431 return met in methods
434def is_valid_structure(ext):
435 """Checks if structure format is compatible with GROMACS"""
436 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"]
437 return ext in formats
440def is_valid_index(ext):
441 """Checks if structure format is compatible with GROMACS"""
442 formats = ["ndx"]
443 return ext in formats
446def is_valid_trajectory(ext):
447 """Checks if trajectory format is compatible with GROMACS"""
448 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"]
449 return ext in formats
452def is_valid_trajectory_output(ext):
453 """Checks if trajectory format is compatible with GROMACS"""
454 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"]
455 return ext in formats
458def is_valid_energy(ext):
459 """Checks if energy format is compatible with GROMACS"""
460 formats = ["edr"]
461 return ext in formats
464def is_valid_xvg(ext):
465 """Checks if file is XVG"""
466 formats = ["xvg"]
467 return ext in formats
470def is_valid_zip(ext):
471 """Checks if file is ZIP"""
472 formats = ["zip"]
473 return ext in formats
476def is_valid_xvg_param(ext):
477 """Checks xvg parameter"""
478 formats = ["xmgrace", "xmgr", "none"]
479 return ext in formats
482def is_valid_ot_str_ens(ext):
483 """Checks if output type for structure ensemble is correct"""
484 formats = ["gro", "g96", "pdb"]
485 return ext in formats
488def is_valid_pbc(pbc):
489 """Checks pbc parameter"""
490 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"]
491 return pbc in values
494def is_valid_ur(ur):
495 """Checks ur parameter"""
496 values = ["rect", "tric", "compact"]
497 return ur in values
500def is_valid_fit(fit):
501 """Checks fit parameter"""
502 values = [
503 "none",
504 "rot+trans",
505 "rotxy+transxy",
506 "translation",
507 "transxy",
508 "progressive",
509 ]
510 return fit in values
513def is_valid_term(iterms):
514 """Checks if term is correct"""
515 cterms = [
516 "Angle",
517 "Proper-Dih.",
518 "Improper-Dih.",
519 "LJ-14",
520 "Coulomb-14",
521 "LJ-(SR)",
522 "Coulomb-(SR)",
523 "Coul.-recip.",
524 "Position-Rest.",
525 "Potential",
526 "Kinetic-En.",
527 "Total-Energy",
528 "Temperature",
529 "Pressure",
530 " Constr.-rmsd",
531 "Box-X",
532 "Box-Y",
533 " Box-Z",
534 "Volume",
535 "Density",
536 "pV",
537 "Enthalpy",
538 "Vir-XX",
539 "Vir-XY",
540 "Vir-XZ",
541 "Vir-YX",
542 "Vir-YY",
543 "Vir-YZ",
544 "Vir-ZX",
545 "Vir-ZY",
546 "Vir-ZZ",
547 "Pres-XX",
548 "Pres-XY",
549 "Pres-XZ",
550 "Pres-YX",
551 "Pres-YY",
552 "Pres-YZ",
553 "Pres-ZX",
554 "Pres-ZY",
555 "Pres-ZZ",
556 "#Surf*SurfTen",
557 "Box-Vel-XX",
558 "Box-Vel-YY",
559 "Box-Vel-ZZ",
560 "Mu-X",
561 "Mu-Y",
562 "Mu-Z",
563 "T-Protein",
564 "T-non-Protein",
565 "Lamb-Protein",
566 "Lamb-non-Protein",
567 ]
568 return all(elem in cterms for elem in iterms)
571def is_valid_selection(ext):
572 """Checks if selection is correct"""
573 formats = [
574 "System",
575 "Protein",
576 "Protein-H",
577 "C-alpha",
578 "Backbone",
579 "MainChain",
580 "MainChain+Cb",
581 "MainChain+H",
582 "SideChain",
583 "SideChain-H",
584 "Prot-Masses",
585 "non-Protein",
586 "Water",
587 "SOL",
588 "non-Water",
589 "Ion",
590 "NA",
591 "CL",
592 "Water_and_ions",
593 "DNA",
594 "RNA",
595 "Protein_DNA",
596 "Protein_RNA",
597 "Protein_DNA_RNA",
598 "DNA_RNA",
599 "DPPC",
600 "DMPC",
601 "POPG",
602 "POPA",
603 "POPC",
604 "POPE",
605 "DMTAP",
606 "POPS"
607 ]
608 return ext in formats
611def copy_instructions_file_to_container(instructions_file, unique_dir):
612 shutil.copy2(instructions_file, unique_dir)
615def remove_tmp_files(list, remove_tmp, out_log):
616 """Removes temporal files generated by the wrapper"""
617 if remove_tmp:
618 tmp_files = list
619 removed_files = [f for f in tmp_files if fu.rm(f)]
620 fu.log("Removed: %s" % str(removed_files), out_log)
623def process_output_trjconv_str_ens(
624 tmp_folder, output_file, output_dir, glob_pattern, out_log
625):
626 tmp_fl = list(Path(tmp_folder).glob(glob_pattern))
627 if not tmp_fl:
628 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb"))
630 files_list = []
631 for file_name in tmp_fl:
632 files_list.append(file_name)
634 # adding files from temporary folder to zip
635 fu.zip_list(output_file, files_list, out_log)
637 shutil.copy2(output_file, output_dir)
640def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]:
641 """
642 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is.
643 Returns an empty list if input_data is None.
645 Parameters:
646 input_data (str, list, or None): The string, list, or None value to convert.
648 Returns:
649 list: A list of string elements or an empty list if input_data is None.
650 """
651 if input_data is None:
652 return []
654 if isinstance(input_data, list):
655 # If input is already a list, return it
656 return input_data
658 # If input is a string, determine the delimiter based on presence of commas
659 delimiter = "," if "," in input_data else " "
660 items = input_data.split(delimiter)
662 # Remove whitespace from each item and ignore empty strings
663 processed_items = [item.strip() for item in items if item.strip()]
665 return processed_items