Coverage for biobb_analysis/gromacs/common.py: 60%
326 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 09:14 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 09:14 +0000
1"""Common functions for package biobb_analysis.gromacs"""
3import re
4import shutil
5from pathlib import Path, PurePath
6from typing import Optional, Union
8from biobb_common.command_wrapper import cmd_wrapper
9from biobb_common.tools import file_utils as fu
12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool:
13 print("Comparing GROMACS files:")
14 print("FILE_A: %s" % str(Path(file_a).resolve()))
15 print("FILE_B: %s" % str(Path(file_b).resolve()))
16 check_result = "check_result.out"
17 cmd = [gmx, "check"]
18 if file_a.endswith(".tpr"):
19 cmd.append("-s1")
20 else:
21 cmd.append("-f")
22 cmd.append(file_a)
23 if file_b.endswith(".tpr"):
24 cmd.append("-s2")
25 else:
26 cmd.append("-f2")
27 cmd.append(file_b)
28 cmd.append("> check_result.out")
29 cmd_wrapper.CmdWrapper(cmd).launch()
30 print("Result file: %s" % str(Path(check_result).resolve()))
31 with open(check_result) as check_file:
32 for line_num, line in enumerate(check_file):
33 if not line.rstrip():
34 continue
35 if line.startswith("Both files read correctly"):
36 continue
37 if not line.startswith("comparing"):
38 print("Discrepance found in line %d: %s" % (line_num, line))
39 return False
40 return True
43def check_energy_path(path, out_log, classname):
44 """Checks energy input file"""
45 if not Path(path).exists():
46 fu.log(classname + ": Unexisting energy input file, exiting", out_log)
47 raise SystemExit(classname + ": Unexisting energy input file")
48 file_extension = PurePath(path).suffix
49 if not is_valid_energy(file_extension[1:]):
50 fu.log(
51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:],
52 out_log,
53 )
54 raise SystemExit(
55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:]
56 )
57 # if file input has no path, add cwd because execution is launched on tmp folder
58 if PurePath(path).name == path or not PurePath(path).is_absolute():
59 path = str(PurePath(Path.cwd()).joinpath(path))
60 return path
63def check_input_path(path, out_log, classname):
64 """Checks input structure file"""
65 if not Path(path).exists():
66 fu.log(classname + ": Unexisting structure input file, exiting", out_log)
67 raise SystemExit(classname + ": Unexisting structure input file")
68 file_extension = PurePath(path).suffix
69 if not is_valid_structure(file_extension[1:]):
70 fu.log(
71 classname + ": Format %s in structure input file is not compatible"
72 % file_extension[1:],
73 out_log,
74 )
75 raise SystemExit(
76 classname + ": Format %s in structure input file is not compatible"
77 % file_extension[1:]
78 )
79 # if file input has no path, add cwd because execution is launched on tmp folder
80 if PurePath(path).name == path or not PurePath(path).is_absolute():
81 path = str(PurePath(Path.cwd()).joinpath(path))
82 return path
85def check_index_path(path, out_log, classname):
86 """Checks index input file"""
87 if not path:
88 return None
89 file_extension = PurePath(path).suffix
90 if not is_valid_index(file_extension[1:]):
91 fu.log(
92 classname + ": Format %s in index input file is not compatible" % file_extension[1:],
93 out_log,
94 )
95 raise SystemExit(
96 classname + ": Format %s in index input file is not compatible" % file_extension[1:]
97 )
98 # if file input has no path, add cwd because execution is launched on tmp folder
99 if PurePath(path).name == path or not PurePath(path).is_absolute():
100 path = str(PurePath(Path.cwd()).joinpath(path))
101 return path
104def check_traj_path(path, out_log, classname):
105 """Checks input structure file"""
106 if not Path(path).exists():
107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log)
108 raise SystemExit(classname + ": Unexisting trajectory input file")
109 file_extension = PurePath(path).suffix
110 if not is_valid_trajectory(file_extension[1:]):
111 fu.log(
112 classname + ": Format %s in trajectory input file is not compatible"
113 % file_extension[1:],
114 out_log,
115 )
116 raise SystemExit(
117 classname + ": Format %s in trajectory input file is not compatible"
118 % file_extension[1:]
119 )
120 # if file input has no path, add cwd because execution is launched on tmp folder
121 if PurePath(path).name == path or not PurePath(path).is_absolute():
122 path = str(PurePath(Path.cwd()).joinpath(path))
123 return path
126def check_out_xvg_path(path, out_log, classname):
127 """Checks if output folder exists and format is xvg"""
128 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
129 fu.log(classname + ": Unexisting output folder, exiting", out_log)
130 raise SystemExit(classname + ": Unexisting output folder")
131 file_extension = PurePath(path).suffix
132 if not is_valid_xvg(file_extension[1:]):
133 fu.log(
134 classname + ": Format %s in output file is not compatible" % file_extension[1:],
135 out_log,
136 )
137 raise SystemExit(
138 classname + ": Format %s in output file is not compatible" % file_extension[1:]
139 )
140 return path
143def check_out_log_path(path, out_log, classname):
144 """Checks if output folder exists for log-like files"""
145 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
146 fu.log(classname + ": Unexisting output folder, exiting", out_log)
147 raise SystemExit(classname + ": Unexisting output folder")
148 return path
151def check_out_pdb_path(path, out_log, classname):
152 """Checks if output folder exists and format is xvg"""
153 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
154 fu.log(classname + ": Unexisting output folder, exiting", out_log)
155 raise SystemExit(classname + ": Unexisting output folder")
156 file_extension = PurePath(path).suffix
157 if not is_valid_structure(file_extension[1:]):
158 fu.log(
159 classname + ": Format %s in output file is not compatible" % file_extension[1:],
160 out_log,
161 )
162 raise SystemExit(
163 classname + ": Format %s in output file is not compatible" % file_extension[1:]
164 )
165 return path
168def check_out_traj_path(path, out_log, classname):
169 """Checks if output folder exists and format is correct"""
170 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
171 fu.log(classname + ": Unexisting output folder, exiting", out_log)
172 raise SystemExit(classname + ": Unexisting output folder")
173 file_extension = PurePath(path).suffix
174 if not is_valid_trajectory_output(file_extension[1:]):
175 fu.log(
176 classname + ": Format %s in output file is not compatible" % file_extension[1:],
177 out_log,
178 )
179 raise SystemExit(
180 classname + ": Format %s in output file is not compatible" % file_extension[1:]
181 )
182 return path
185def check_out_str_ens_path(path, out_log, classname):
186 """Checks if output folder exists and format is correct"""
187 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
188 fu.log(classname + ": Unexisting output folder, exiting", out_log)
189 raise SystemExit(classname + ": Unexisting output folder")
190 file_extension = PurePath(path).suffix
191 if not is_valid_zip(file_extension[1:]):
192 fu.log(
193 classname + ": Format %s in output file is not compatible" % file_extension[1:],
194 out_log,
195 )
196 raise SystemExit(
197 classname + ": Format %s in output file is not compatible" % file_extension[1:]
198 )
199 return path
202def get_default_value(key):
203 """Gives default values according to the given key"""
205 default_values = {
206 "instructions_file": "instructions.in",
207 "binary_path": "gmx",
208 "terms": ["Potential"],
209 "selection": "System",
210 "xvg": "none",
211 "dista": False,
212 "method": "linkage",
213 "cutoff": 0.1,
214 "cluster_selection": "System",
215 "fit_selection": "System",
216 "center_selection": "System",
217 "output_selection": "System",
218 "pbc": "mol",
219 "center": True,
220 "fit": "none",
221 "ur": "compact",
222 "skip": 1,
223 "start": None,
224 "end": None,
225 "dt": None,
226 "ot_str_ens": "pdb",
227 }
229 return default_values[key]
232def get_binary_path(properties, type):
233 """Gets binary path"""
234 return properties.get(type, get_default_value(type))
237def get_terms(properties, out_log, classname):
238 """Gets energy terms"""
239 terms = properties.get("terms", dict())
240 if not terms or not isinstance(terms, list):
241 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log)
242 raise SystemExit(classname + ": No terms provided or incorrect format")
243 if not is_valid_term(terms):
244 fu.log(classname + ": Incorrect terms provided, exiting", out_log)
245 raise SystemExit(classname + ": Incorrect terms provided")
246 return properties.get("terms", "")
249def get_selection(properties, out_log, classname):
250 """Gets selection items"""
251 selection = properties.get("selection", get_default_value("selection"))
252 if not selection:
253 fu.log(
254 classname + ": No selection provided or incorrect format, exiting", out_log
255 )
256 raise SystemExit(classname + ": No selection provided or incorrect format")
257 if not is_valid_selection(selection):
258 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
259 raise SystemExit(classname + ": Incorrect selection provided")
260 return selection
263def get_image_selection(properties, key, out_log, classname):
264 """Gets selection items"""
265 selection = properties.get(key, get_default_value(key))
266 if not selection:
267 fu.log(
268 classname + ": No selection provided or incorrect format, exiting", out_log
269 )
270 raise SystemExit(classname + ": No selection provided or incorrect format")
271 if not is_valid_selection(selection):
272 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
273 raise SystemExit(classname + ": Incorrect selection provided")
274 return selection
277def get_selection_index_file(properties, index, key, out_log, classname):
278 """Gets selection items from provided index file"""
279 pattern = re.compile(r"\[.*\]")
280 selection = []
281 with open(index, "r") as ndx_file:
282 for i, line in enumerate(ndx_file):
283 for match in re.finditer(pattern, line):
284 selection.append(re.sub(r"[\[\] ]", "", match.group()))
285 sel = properties.get(key, get_default_value(key))
286 if sel not in selection:
287 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
288 raise SystemExit(classname + ": Incorrect selection provided")
289 return sel
292def get_pbc(properties, out_log, classname):
293 """Gets pbc"""
294 pbc = properties.get("pbc", get_default_value("pbc"))
295 if not is_valid_pbc(pbc):
296 fu.log(classname + ": Incorrect pbc provided, exiting", out_log)
297 raise SystemExit(classname + ": Incorrect pbc provided")
298 return pbc
301def get_center(properties, out_log, classname):
302 """Gets center"""
303 center = properties.get("center", get_default_value("center"))
304 if not is_valid_boolean(center):
305 fu.log(classname + ": Incorrect center provided, exiting", out_log)
306 raise SystemExit(classname + ": Incorrect center provided")
307 return center
310def get_ur(properties, out_log, classname):
311 """Gets ur"""
312 ur = properties.get("ur", get_default_value("ur"))
313 if not is_valid_ur(ur):
314 fu.log(classname + ": Incorrect ur provided, exiting", out_log)
315 raise SystemExit(classname + ": Incorrect ur provided")
316 return ur
319def get_fit(properties, out_log, classname):
320 """Gets fit"""
321 fit = properties.get("fit", get_default_value("fit"))
322 if not is_valid_fit(fit):
323 fu.log(classname + ": Incorrect fit provided, exiting", out_log)
324 raise SystemExit(classname + ": Incorrect fit provided")
325 return fit
328def get_skip(properties, out_log, classname):
329 """Gets skip"""
330 skip = properties.get("skip", get_default_value("skip"))
331 if not is_valid_int(skip):
332 fu.log(classname + ": Incorrect skip provided, exiting", out_log)
333 raise SystemExit(classname + ": Incorrect start provided")
334 return str(skip)
337def get_start(properties, out_log, classname):
338 """Gets start"""
339 start = properties.get("start", get_default_value("start"))
341 if start is None:
342 return start
343 if not is_valid_int(start):
344 fu.log(classname + ": Incorrect start provided, exiting", out_log)
345 raise SystemExit(classname + ": Incorrect start provided")
346 return str(start)
349def get_end(properties, out_log, classname):
350 """Gets end"""
351 end = properties.get("end", get_default_value("end"))
352 if end is None:
353 return end
354 if not is_valid_int(end):
355 fu.log(classname + ": Incorrect end provided, exiting", out_log)
356 raise SystemExit(classname + ": Incorrect end provided")
357 return str(end)
360def get_dt(properties, out_log, classname):
361 """Gets dt"""
362 dt = properties.get("dt", get_default_value("dt"))
363 if dt is None:
364 return dt
365 if not is_valid_int(dt):
366 fu.log(classname + ": Incorrect dt provided, exiting", out_log)
367 raise SystemExit(classname + ": Incorrect dt provided")
368 return str(dt)
371def get_ot_str_ens(properties, out_log, classname):
372 """Gets output type"""
373 output_type = properties.get("output_type", get_default_value("ot_str_ens"))
374 if not is_valid_ot_str_ens(output_type):
375 fu.log(classname + ": Incorrect output_type provided, exiting", out_log)
376 raise SystemExit(classname + ": Incorrect output_type provided")
377 return str(output_type)
380def get_xvg(properties, out_log, classname):
381 """Gets xvg"""
382 xvg = properties.get("xvg", get_default_value("xvg"))
383 if not is_valid_xvg_param(xvg):
384 fu.log(classname + ": Incorrect xvg provided, exiting", out_log)
385 raise SystemExit(classname + ": Incorrect xvg provided")
386 return xvg
389def get_dista(properties, out_log, classname):
390 """Gets dista"""
391 dista = properties.get("dista", get_default_value("dista"))
392 if not is_valid_boolean(dista):
393 fu.log(classname + ": Incorrect dista provided, exiting", out_log)
394 raise SystemExit(classname + ": Incorrect dista provided")
395 return dista
398def get_method(properties, out_log, classname):
399 """Gets method"""
400 method = properties.get("method", get_default_value("method"))
401 if not is_valid_method_param(method):
402 fu.log(classname + ": Incorrect method provided, exiting", out_log)
403 raise SystemExit(classname + ": Incorrect method provided")
404 return method
407def get_cutoff(properties, out_log, classname):
408 """Gets cutoff"""
409 cutoff = properties.get("cutoff", get_default_value("cutoff"))
410 if not is_valid_float(cutoff):
411 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log)
412 raise SystemExit(classname + ": Incorrect cutoff provided")
413 return str(cutoff)
416def is_valid_boolean(val):
417 """Checks if given value is boolean"""
418 values = [True, False]
419 return val in values
422def is_valid_float(val):
423 """Checks if given value is float"""
424 if val and not isinstance(val, float) and not isinstance(val, int):
425 return False
426 return True
429def is_valid_int(val):
430 """Checks if given value is int"""
431 if val and not isinstance(val, int):
432 return False
433 return True
436def is_valid_method_param(met):
437 """Checks if method is compatible with GROMACS"""
438 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"]
439 return met in methods
442def is_valid_structure(ext):
443 """Checks if structure format is compatible with GROMACS"""
444 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"]
445 return ext in formats
448def is_valid_index(ext):
449 """Checks if structure format is compatible with GROMACS"""
450 formats = ["ndx"]
451 return ext in formats
454def is_valid_trajectory(ext):
455 """Checks if trajectory format is compatible with GROMACS"""
456 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"]
457 return ext in formats
460def is_valid_trajectory_output(ext):
461 """Checks if trajectory format is compatible with GROMACS"""
462 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"]
463 return ext in formats
466def is_valid_energy(ext):
467 """Checks if energy format is compatible with GROMACS"""
468 formats = ["edr"]
469 return ext in formats
472def is_valid_xvg(ext):
473 """Checks if file is XVG"""
474 formats = ["xvg"]
475 return ext in formats
478def is_valid_zip(ext):
479 """Checks if file is ZIP"""
480 formats = ["zip"]
481 return ext in formats
484def is_valid_xvg_param(ext):
485 """Checks xvg parameter"""
486 formats = ["xmgrace", "xmgr", "none"]
487 return ext in formats
490def is_valid_ot_str_ens(ext):
491 """Checks if output type for structure ensemble is correct"""
492 formats = ["gro", "g96", "pdb"]
493 return ext in formats
496def is_valid_pbc(pbc):
497 """Checks pbc parameter"""
498 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"]
499 return pbc in values
502def is_valid_ur(ur):
503 """Checks ur parameter"""
504 values = ["rect", "tric", "compact"]
505 return ur in values
508def is_valid_fit(fit):
509 """Checks fit parameter"""
510 values = [
511 "none",
512 "rot+trans",
513 "rotxy+transxy",
514 "translation",
515 "transxy",
516 "progressive",
517 ]
518 return fit in values
521def is_valid_term(iterms):
522 """Checks if term is correct"""
523 cterms = [
524 "Angle",
525 "Proper-Dih.",
526 "Improper-Dih.",
527 "LJ-14",
528 "Coulomb-14",
529 "LJ-(SR)",
530 "Coulomb-(SR)",
531 "Coul.-recip.",
532 "Position-Rest.",
533 "Potential",
534 "Kinetic-En.",
535 "Total-Energy",
536 "Temperature",
537 "Pressure",
538 " Constr.-rmsd",
539 "Box-X",
540 "Box-Y",
541 " Box-Z",
542 "Volume",
543 "Density",
544 "pV",
545 "Enthalpy",
546 "Vir-XX",
547 "Vir-XY",
548 "Vir-XZ",
549 "Vir-YX",
550 "Vir-YY",
551 "Vir-YZ",
552 "Vir-ZX",
553 "Vir-ZY",
554 "Vir-ZZ",
555 "Pres-XX",
556 "Pres-XY",
557 "Pres-XZ",
558 "Pres-YX",
559 "Pres-YY",
560 "Pres-YZ",
561 "Pres-ZX",
562 "Pres-ZY",
563 "Pres-ZZ",
564 "#Surf*SurfTen",
565 "Box-Vel-XX",
566 "Box-Vel-YY",
567 "Box-Vel-ZZ",
568 "Mu-X",
569 "Mu-Y",
570 "Mu-Z",
571 "T-Protein",
572 "T-non-Protein",
573 "Lamb-Protein",
574 "Lamb-non-Protein",
575 ]
576 return all(elem in cterms for elem in iterms)
579def is_valid_selection(ext):
580 """Checks if selection is correct"""
581 formats = [
582 "System",
583 "Protein",
584 "Protein-H",
585 "C-alpha",
586 "Backbone",
587 "MainChain",
588 "MainChain+Cb",
589 "MainChain+H",
590 "SideChain",
591 "SideChain-H",
592 "Prot-Masses",
593 "non-Protein",
594 "Water",
595 "SOL",
596 "non-Water",
597 "Ion",
598 "NA",
599 "CL",
600 "Water_and_ions",
601 "DNA",
602 "RNA",
603 "Protein_DNA",
604 "Protein_RNA",
605 "Protein_DNA_RNA",
606 "DNA_RNA",
607 "DPPC",
608 "DMPC",
609 "POPG",
610 "POPA",
611 "POPC",
612 "POPE",
613 "DMTAP",
614 "POPS"
615 ]
616 return ext in formats
619def copy_instructions_file_to_container(instructions_file, unique_dir):
620 shutil.copy2(instructions_file, unique_dir)
623def remove_tmp_files(list, remove_tmp, out_log):
624 """Removes temporal files generated by the wrapper"""
625 if remove_tmp:
626 tmp_files = list
627 removed_files = [f for f in tmp_files if fu.rm(f)]
628 fu.log("Removed: %s" % str(removed_files), out_log)
631def process_output_trjconv_str_ens(
632 tmp_folder, output_file, output_dir, glob_pattern, out_log
633):
634 tmp_fl = list(Path(tmp_folder).glob(glob_pattern))
635 if not tmp_fl:
636 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb"))
638 files_list = []
639 for file_name in tmp_fl:
640 files_list.append(file_name)
642 # adding files from temporary folder to zip
643 fu.zip_list(output_file, files_list, out_log)
645 shutil.copy2(output_file, output_dir)
648def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]:
649 """
650 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is.
651 Returns an empty list if input_data is None.
653 Parameters:
654 input_data (str, list, or None): The string, list, or None value to convert.
656 Returns:
657 list: A list of string elements or an empty list if input_data is None.
658 """
659 if input_data is None:
660 return []
662 if isinstance(input_data, list):
663 # If input is already a list, return it
664 return input_data
666 # If input is a string, determine the delimiter based on presence of commas
667 delimiter = "," if "," in input_data else " "
668 items = input_data.split(delimiter)
670 # Remove whitespace from each item and ignore empty strings
671 processed_items = [item.strip() for item in items if item.strip()]
673 return processed_items