Coverage for biobb_analysis / gromacs / common.py: 60%
334 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 10:53 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 10:53 +0000
1"""Common functions for package biobb_analysis.gromacs"""
3import re
4import shutil
5from pathlib import Path, PurePath
6from typing import Optional, Union
8from biobb_common.command_wrapper import cmd_wrapper
9from biobb_common.tools import file_utils as fu
12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool:
13 print("Comparing GROMACS files:")
14 print("FILE_A: %s" % str(Path(file_a).resolve()))
15 print("FILE_B: %s" % str(Path(file_b).resolve()))
16 check_result = "check_result.out"
17 cmd = [gmx, "check"]
18 if file_a.endswith(".tpr"):
19 cmd.append("-s1")
20 else:
21 cmd.append("-f")
22 cmd.append(file_a)
23 if file_b.endswith(".tpr"):
24 cmd.append("-s2")
25 else:
26 cmd.append("-f2")
27 cmd.append(file_b)
28 cmd.append("> check_result.out")
29 cmd_wrapper.CmdWrapper(cmd).launch()
30 print("Result file: %s" % str(Path(check_result).resolve()))
31 with open(check_result) as check_file:
32 for line_num, line in enumerate(check_file):
33 if not line.rstrip():
34 continue
35 if line.startswith("Both files read correctly"):
36 continue
37 if not line.startswith("comparing"):
38 print("Discrepance found in line %d: %s" % (line_num, line))
39 return False
40 return True
43def check_energy_path(path, out_log, classname):
44 """Checks energy input file"""
45 if not Path(path).exists():
46 fu.log(classname + ": Unexisting energy input file, exiting", out_log)
47 raise SystemExit(classname + ": Unexisting energy input file")
48 file_extension = PurePath(path).suffix
49 if not is_valid_energy(file_extension[1:]):
50 fu.log(
51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:],
52 out_log,
53 )
54 raise SystemExit(
55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:]
56 )
57 # if file input has no path, add cwd because execution is launched on tmp folder
58 if PurePath(path).name == path or not PurePath(path).is_absolute():
59 path = str(PurePath(Path.cwd()).joinpath(path))
60 return path
63def check_input_path(path, out_log, classname):
64 """Checks input structure file"""
65 if not Path(path).exists():
66 fu.log(classname + ": Unexisting structure input file, exiting", out_log)
67 raise SystemExit(classname + ": Unexisting structure input file")
68 file_extension = PurePath(path).suffix
69 if not is_valid_structure(file_extension[1:]):
70 fu.log(
71 classname + ": Format %s in structure input file is not compatible"
72 % file_extension[1:],
73 out_log,
74 )
75 raise SystemExit(
76 classname + ": Format %s in structure input file is not compatible"
77 % file_extension[1:]
78 )
79 # if file input has no path, add cwd because execution is launched on tmp folder
80 if PurePath(path).name == path or not PurePath(path).is_absolute():
81 path = str(PurePath(Path.cwd()).joinpath(path))
82 return path
85def check_index_path(path, out_log, classname):
86 """Checks index input file"""
87 if not path:
88 return None
89 file_extension = PurePath(path).suffix
90 if not is_valid_index(file_extension[1:]):
91 fu.log(
92 classname + ": Format %s in index input file is not compatible" % file_extension[1:],
93 out_log,
94 )
95 raise SystemExit(
96 classname + ": Format %s in index input file is not compatible" % file_extension[1:]
97 )
98 # if file input has no path, add cwd because execution is launched on tmp folder
99 if PurePath(path).name == path or not PurePath(path).is_absolute():
100 path = str(PurePath(Path.cwd()).joinpath(path))
101 return path
104def check_traj_path(path, out_log, classname):
105 """Checks input structure file"""
106 if not Path(path).exists():
107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log)
108 raise SystemExit(classname + ": Unexisting trajectory input file")
109 file_extension = PurePath(path).suffix
110 if not is_valid_trajectory(file_extension[1:]):
111 fu.log(
112 classname + ": Format %s in trajectory input file is not compatible"
113 % file_extension[1:],
114 out_log,
115 )
116 raise SystemExit(
117 classname + ": Format %s in trajectory input file is not compatible"
118 % file_extension[1:]
119 )
120 # if file input has no path, add cwd because execution is launched on tmp folder
121 if PurePath(path).name == path or not PurePath(path).is_absolute():
122 path = str(PurePath(Path.cwd()).joinpath(path))
123 return path
126def check_out_xvg_path(path, out_log, classname):
127 """Checks if output folder exists and format is xvg"""
128 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
129 fu.log(classname + ": Unexisting output folder, exiting", out_log)
130 raise SystemExit(classname + ": Unexisting output folder")
131 file_extension = PurePath(path).suffix
132 if not is_valid_xvg(file_extension[1:]):
133 fu.log(
134 classname + ": Format %s in output file is not compatible" % file_extension[1:],
135 out_log,
136 )
137 raise SystemExit(
138 classname + ": Format %s in output file is not compatible" % file_extension[1:]
139 )
140 return path
143def check_out_log_path(path, out_log, classname):
144 """Checks if output folder exists for log-like files"""
145 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
146 fu.log(classname + ": Unexisting output folder, exiting", out_log)
147 raise SystemExit(classname + ": Unexisting output folder")
148 return path
151def check_out_pdb_path(path, out_log, classname):
152 """Checks if output folder exists and format is xvg"""
153 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
154 fu.log(classname + ": Unexisting output folder, exiting", out_log)
155 raise SystemExit(classname + ": Unexisting output folder")
156 file_extension = PurePath(path).suffix
157 if not is_valid_structure(file_extension[1:]):
158 fu.log(
159 classname + ": Format %s in output file is not compatible" % file_extension[1:],
160 out_log,
161 )
162 raise SystemExit(
163 classname + ": Format %s in output file is not compatible" % file_extension[1:]
164 )
165 return path
168def check_out_traj_path(path, out_log, classname):
169 """Checks if output folder exists and format is correct"""
170 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
171 fu.log(classname + ": Unexisting output folder, exiting", out_log)
172 raise SystemExit(classname + ": Unexisting output folder")
173 file_extension = PurePath(path).suffix
174 if not is_valid_trajectory_output(file_extension[1:]):
175 fu.log(
176 classname + ": Format %s in output file is not compatible" % file_extension[1:],
177 out_log,
178 )
179 raise SystemExit(
180 classname + ": Format %s in output file is not compatible" % file_extension[1:]
181 )
182 return path
185def check_out_str_ens_path(path, out_log, classname):
186 """Checks if output folder exists and format is correct"""
187 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
188 fu.log(classname + ": Unexisting output folder, exiting", out_log)
189 raise SystemExit(classname + ": Unexisting output folder")
190 file_extension = PurePath(path).suffix
191 if not is_valid_zip(file_extension[1:]):
192 fu.log(
193 classname + ": Format %s in output file is not compatible" % file_extension[1:],
194 out_log,
195 )
196 raise SystemExit(
197 classname + ": Format %s in output file is not compatible" % file_extension[1:]
198 )
199 return path
202def get_default_value(key):
203 """Gives default values according to the given key"""
205 default_values = {
206 "instructions_file": "instructions.in",
207 "binary_path": "gmx",
208 "terms": ["Potential"],
209 "selection": "System",
210 "xvg": "none",
211 "dista": False,
212 "method": "linkage",
213 "cutoff": 0.1,
214 "cluster_selection": "System",
215 "fit_selection": "System",
216 "center_selection": "System",
217 "output_selection": "System",
218 "pbc": "mol",
219 "center": True,
220 "fit": "none",
221 "ur": "compact",
222 "skip": 1,
223 "start": None,
224 "end": None,
225 "dt": None,
226 "dump": None,
227 "ot_str_ens": "pdb",
228 }
230 return default_values[key]
233def get_binary_path(properties, type):
234 """Gets binary path"""
235 return properties.get(type, get_default_value(type))
238def get_terms(properties, out_log, classname):
239 """Gets energy terms"""
240 terms = properties.get("terms", dict())
241 if not terms or not isinstance(terms, list):
242 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log)
243 raise SystemExit(classname + ": No terms provided or incorrect format")
244 if not is_valid_term(terms):
245 fu.log(classname + ": Incorrect terms provided, exiting", out_log)
246 raise SystemExit(classname + ": Incorrect terms provided")
247 return properties.get("terms", "")
250def get_selection(properties, out_log, classname):
251 """Gets selection items"""
252 selection = properties.get("selection", get_default_value("selection"))
253 if not selection:
254 fu.log(
255 classname + ": No selection provided or incorrect format, exiting", out_log
256 )
257 raise SystemExit(classname + ": No selection provided or incorrect format")
258 if not is_valid_selection(selection):
259 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
260 raise SystemExit(classname + ": Incorrect selection provided")
261 return selection
264def get_image_selection(properties, key, out_log, classname):
265 """Gets selection items"""
266 selection = properties.get(key, get_default_value(key))
267 if not selection:
268 fu.log(
269 classname + ": No selection provided or incorrect format, exiting", out_log
270 )
271 raise SystemExit(classname + ": No selection provided or incorrect format")
272 if not is_valid_selection(selection):
273 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
274 raise SystemExit(classname + ": Incorrect selection provided")
275 return selection
278def get_selection_index_file(properties, index, key, out_log, classname):
279 """Gets selection items from provided index file"""
280 pattern = re.compile(r"\[.*\]")
281 selection = []
282 with open(index, "r") as ndx_file:
283 for i, line in enumerate(ndx_file):
284 for match in re.finditer(pattern, line):
285 selection.append(re.sub(r"[\[\] ]", "", match.group()))
286 sel = properties.get(key, get_default_value(key))
287 if sel not in selection:
288 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
289 raise SystemExit(classname + ": Incorrect selection provided")
290 return sel
293def get_pbc(properties, out_log, classname):
294 """Gets pbc"""
295 pbc = properties.get("pbc", get_default_value("pbc"))
296 if not is_valid_pbc(pbc):
297 fu.log(classname + ": Incorrect pbc provided, exiting", out_log)
298 raise SystemExit(classname + ": Incorrect pbc provided")
299 return pbc
302def get_center(properties, out_log, classname):
303 """Gets center"""
304 center = properties.get("center", get_default_value("center"))
305 if not is_valid_boolean(center):
306 fu.log(classname + ": Incorrect center provided, exiting", out_log)
307 raise SystemExit(classname + ": Incorrect center provided")
308 return center
311def get_ur(properties, out_log, classname):
312 """Gets ur"""
313 ur = properties.get("ur", get_default_value("ur"))
314 if not is_valid_ur(ur):
315 fu.log(classname + ": Incorrect ur provided, exiting", out_log)
316 raise SystemExit(classname + ": Incorrect ur provided")
317 return ur
320def get_fit(properties, out_log, classname):
321 """Gets fit"""
322 fit = properties.get("fit", get_default_value("fit"))
323 if not is_valid_fit(fit):
324 fu.log(classname + ": Incorrect fit provided, exiting", out_log)
325 raise SystemExit(classname + ": Incorrect fit provided")
326 return fit
329def get_skip(properties, out_log, classname):
330 """Gets skip"""
331 skip = properties.get("skip", get_default_value("skip"))
332 if not is_valid_int(skip):
333 fu.log(classname + ": Incorrect skip provided, exiting", out_log)
334 raise SystemExit(classname + ": Incorrect start provided")
335 return str(skip)
338def get_start(properties, out_log, classname):
339 """Gets start"""
340 start = properties.get("start", get_default_value("start"))
342 if start is None:
343 return start
344 if not is_valid_int(start):
345 fu.log(classname + ": Incorrect start provided, exiting", out_log)
346 raise SystemExit(classname + ": Incorrect start provided")
347 return str(start)
350def get_end(properties, out_log, classname):
351 """Gets end"""
352 end = properties.get("end", get_default_value("end"))
353 if end is None:
354 return end
355 if not is_valid_int(end):
356 fu.log(classname + ": Incorrect end provided, exiting", out_log)
357 raise SystemExit(classname + ": Incorrect end provided")
358 return str(end)
361def get_dt(properties, out_log, classname):
362 """Gets dt"""
363 dt = properties.get("dt", get_default_value("dt"))
364 if dt is None:
365 return dt
366 if not is_valid_int(dt):
367 fu.log(classname + ": Incorrect dt provided, exiting", out_log)
368 raise SystemExit(classname + ": Incorrect dt provided")
369 return str(dt)
371def get_dump(properties, out_log, classname):
372 """Gets dump"""
373 dump = properties.get("dump", get_default_value("dump"))
374 if dump is None:
375 return dump
376 if not is_valid_int(dump):
377 fu.log(classname + ": Incorrect dump provided, exiting", out_log)
378 raise SystemExit(classname + ": Incorrect dump provided")
379 return str(dump)
382def get_ot_str_ens(properties, out_log, classname):
383 """Gets output type"""
384 output_type = properties.get("output_type", get_default_value("ot_str_ens"))
385 if not is_valid_ot_str_ens(output_type):
386 fu.log(classname + ": Incorrect output_type provided, exiting", out_log)
387 raise SystemExit(classname + ": Incorrect output_type provided")
388 return str(output_type)
391def get_xvg(properties, out_log, classname):
392 """Gets xvg"""
393 xvg = properties.get("xvg", get_default_value("xvg"))
394 if not is_valid_xvg_param(xvg):
395 fu.log(classname + ": Incorrect xvg provided, exiting", out_log)
396 raise SystemExit(classname + ": Incorrect xvg provided")
397 return xvg
400def get_dista(properties, out_log, classname):
401 """Gets dista"""
402 dista = properties.get("dista", get_default_value("dista"))
403 if not is_valid_boolean(dista):
404 fu.log(classname + ": Incorrect dista provided, exiting", out_log)
405 raise SystemExit(classname + ": Incorrect dista provided")
406 return dista
409def get_method(properties, out_log, classname):
410 """Gets method"""
411 method = properties.get("method", get_default_value("method"))
412 if not is_valid_method_param(method):
413 fu.log(classname + ": Incorrect method provided, exiting", out_log)
414 raise SystemExit(classname + ": Incorrect method provided")
415 return method
418def get_cutoff(properties, out_log, classname):
419 """Gets cutoff"""
420 cutoff = properties.get("cutoff", get_default_value("cutoff"))
421 if not is_valid_float(cutoff):
422 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log)
423 raise SystemExit(classname + ": Incorrect cutoff provided")
424 return str(cutoff)
427def is_valid_boolean(val):
428 """Checks if given value is boolean"""
429 values = [True, False]
430 return val in values
433def is_valid_float(val):
434 """Checks if given value is float"""
435 if val and not isinstance(val, float) and not isinstance(val, int):
436 return False
437 return True
440def is_valid_int(val):
441 """Checks if given value is int"""
442 if val and not isinstance(val, int):
443 return False
444 return True
447def is_valid_method_param(met):
448 """Checks if method is compatible with GROMACS"""
449 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"]
450 return met in methods
453def is_valid_structure(ext):
454 """Checks if structure format is compatible with GROMACS"""
455 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"]
456 return ext in formats
459def is_valid_index(ext):
460 """Checks if structure format is compatible with GROMACS"""
461 formats = ["ndx"]
462 return ext in formats
465def is_valid_trajectory(ext):
466 """Checks if trajectory format is compatible with GROMACS"""
467 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"]
468 return ext in formats
471def is_valid_trajectory_output(ext):
472 """Checks if trajectory format is compatible with GROMACS"""
473 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"]
474 return ext in formats
477def is_valid_energy(ext):
478 """Checks if energy format is compatible with GROMACS"""
479 formats = ["edr"]
480 return ext in formats
483def is_valid_xvg(ext):
484 """Checks if file is XVG"""
485 formats = ["xvg"]
486 return ext in formats
489def is_valid_zip(ext):
490 """Checks if file is ZIP"""
491 formats = ["zip"]
492 return ext in formats
495def is_valid_xvg_param(ext):
496 """Checks xvg parameter"""
497 formats = ["xmgrace", "xmgr", "none"]
498 return ext in formats
501def is_valid_ot_str_ens(ext):
502 """Checks if output type for structure ensemble is correct"""
503 formats = ["gro", "g96", "pdb"]
504 return ext in formats
507def is_valid_pbc(pbc):
508 """Checks pbc parameter"""
509 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"]
510 return pbc in values
513def is_valid_ur(ur):
514 """Checks ur parameter"""
515 values = ["rect", "tric", "compact"]
516 return ur in values
519def is_valid_fit(fit):
520 """Checks fit parameter"""
521 values = [
522 "none",
523 "rot+trans",
524 "rotxy+transxy",
525 "translation",
526 "transxy",
527 "progressive",
528 ]
529 return fit in values
532def is_valid_term(iterms):
533 """Checks if term is correct"""
534 cterms = [
535 "Angle",
536 "Proper-Dih.",
537 "Improper-Dih.",
538 "LJ-14",
539 "Coulomb-14",
540 "LJ-(SR)",
541 "Coulomb-(SR)",
542 "Coul.-recip.",
543 "Position-Rest.",
544 "Potential",
545 "Kinetic-En.",
546 "Total-Energy",
547 "Temperature",
548 "Pressure",
549 " Constr.-rmsd",
550 "Box-X",
551 "Box-Y",
552 " Box-Z",
553 "Volume",
554 "Density",
555 "pV",
556 "Enthalpy",
557 "Vir-XX",
558 "Vir-XY",
559 "Vir-XZ",
560 "Vir-YX",
561 "Vir-YY",
562 "Vir-YZ",
563 "Vir-ZX",
564 "Vir-ZY",
565 "Vir-ZZ",
566 "Pres-XX",
567 "Pres-XY",
568 "Pres-XZ",
569 "Pres-YX",
570 "Pres-YY",
571 "Pres-YZ",
572 "Pres-ZX",
573 "Pres-ZY",
574 "Pres-ZZ",
575 "#Surf*SurfTen",
576 "Box-Vel-XX",
577 "Box-Vel-YY",
578 "Box-Vel-ZZ",
579 "Mu-X",
580 "Mu-Y",
581 "Mu-Z",
582 "T-Protein",
583 "T-non-Protein",
584 "Lamb-Protein",
585 "Lamb-non-Protein",
586 ]
587 return all(elem in cterms for elem in iterms)
590def is_valid_selection(ext):
591 """Checks if selection is correct"""
592 formats = [
593 "System",
594 "Protein",
595 "Protein-H",
596 "C-alpha",
597 "Backbone",
598 "MainChain",
599 "MainChain+Cb",
600 "MainChain+H",
601 "SideChain",
602 "SideChain-H",
603 "Prot-Masses",
604 "non-Protein",
605 "Water",
606 "SOL",
607 "non-Water",
608 "Ion",
609 "NA",
610 "CL",
611 "Water_and_ions",
612 "DNA",
613 "RNA",
614 "Protein_DNA",
615 "Protein_RNA",
616 "Protein_DNA_RNA",
617 "DNA_RNA",
618 "DPPC",
619 "DMPC",
620 "POPG",
621 "POPA",
622 "POPC",
623 "POPE",
624 "DMTAP",
625 "POPS"
626 ]
627 return ext in formats
630def copy_instructions_file_to_container(instructions_file, unique_dir):
631 shutil.copy2(instructions_file, unique_dir)
634def remove_tmp_files(list, remove_tmp, out_log):
635 """Removes temporal files generated by the wrapper"""
636 if remove_tmp:
637 tmp_files = list
638 removed_files = [f for f in tmp_files if fu.rm(f)]
639 fu.log("Removed: %s" % str(removed_files), out_log)
642def process_output_trjconv_str_ens(
643 tmp_folder, output_file, output_dir, glob_pattern, out_log
644):
645 tmp_fl = list(Path(tmp_folder).glob(glob_pattern))
646 if not tmp_fl:
647 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb"))
649 files_list = []
650 for file_name in tmp_fl:
651 files_list.append(file_name)
653 # adding files from temporary folder to zip
654 fu.zip_list(output_file, files_list, out_log)
656 shutil.copy2(output_file, output_dir)
659def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]:
660 """
661 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is.
662 Returns an empty list if input_data is None.
664 Parameters:
665 input_data (str, list, or None): The string, list, or None value to convert.
667 Returns:
668 list: A list of string elements or an empty list if input_data is None.
669 """
670 if input_data is None:
671 return []
673 if isinstance(input_data, list):
674 # If input is already a list, return it
675 return input_data
677 # If input is a string, determine the delimiter based on presence of commas
678 delimiter = "," if "," in input_data else " "
679 items = input_data.split(delimiter)
681 # Remove whitespace from each item and ignore empty strings
682 processed_items = [item.strip() for item in items if item.strip()]
684 return processed_items