Coverage for biobb_analysis/gromacs/common.py: 61%
315 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 12:11 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-28 12:11 +0000
1"""Common functions for package biobb_analysis.gromacs"""
3import re
4import shutil
5from pathlib import Path, PurePath
6from typing import Optional, Union
8from biobb_common.command_wrapper import cmd_wrapper
9from biobb_common.tools import file_utils as fu
12def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool:
13 print("Comparing GROMACS files:")
14 print("FILE_A: %s" % str(Path(file_a).resolve()))
15 print("FILE_B: %s" % str(Path(file_b).resolve()))
16 check_result = "check_result.out"
17 cmd = [gmx, "check"]
18 if file_a.endswith(".tpr"):
19 cmd.append("-s1")
20 else:
21 cmd.append("-f")
22 cmd.append(file_a)
23 if file_b.endswith(".tpr"):
24 cmd.append("-s2")
25 else:
26 cmd.append("-f2")
27 cmd.append(file_b)
28 cmd.append("> check_result.out")
29 cmd_wrapper.CmdWrapper(cmd).launch()
30 print("Result file: %s" % str(Path(check_result).resolve()))
31 with open(check_result) as check_file:
32 for line_num, line in enumerate(check_file):
33 if not line.rstrip():
34 continue
35 if line.startswith("Both files read correctly"):
36 continue
37 if not line.startswith("comparing"):
38 print("Discrepance found in line %d: %s" % (line_num, line))
39 return False
40 return True
43def check_energy_path(path, out_log, classname):
44 """Checks energy input file"""
45 if not Path(path).exists():
46 fu.log(classname + ": Unexisting energy input file, exiting", out_log)
47 raise SystemExit(classname + ": Unexisting energy input file")
48 file_extension = PurePath(path).suffix
49 if not is_valid_energy(file_extension[1:]):
50 fu.log(
51 classname + ": Format %s in energy input file is not compatible" % file_extension[1:],
52 out_log,
53 )
54 raise SystemExit(
55 classname + ": Format %s in energy input file is not compatible" % file_extension[1:]
56 )
57 # if file input has no path, add cwd because execution is launched on tmp folder
58 if PurePath(path).name == path or not PurePath(path).is_absolute():
59 path = str(PurePath(Path.cwd()).joinpath(path))
60 return path
63def check_input_path(path, out_log, classname):
64 """Checks input structure file"""
65 if not Path(path).exists():
66 fu.log(classname + ": Unexisting structure input file, exiting", out_log)
67 raise SystemExit(classname + ": Unexisting structure input file")
68 file_extension = PurePath(path).suffix
69 if not is_valid_structure(file_extension[1:]):
70 fu.log(
71 classname + ": Format %s in structure input file is not compatible"
72 % file_extension[1:],
73 out_log,
74 )
75 raise SystemExit(
76 classname + ": Format %s in structure input file is not compatible"
77 % file_extension[1:]
78 )
79 # if file input has no path, add cwd because execution is launched on tmp folder
80 if PurePath(path).name == path or not PurePath(path).is_absolute():
81 path = str(PurePath(Path.cwd()).joinpath(path))
82 return path
85def check_index_path(path, out_log, classname):
86 """Checks index input file"""
87 if not path:
88 return None
89 file_extension = PurePath(path).suffix
90 if not is_valid_index(file_extension[1:]):
91 fu.log(
92 classname + ": Format %s in index input file is not compatible" % file_extension[1:],
93 out_log,
94 )
95 raise SystemExit(
96 classname + ": Format %s in index input file is not compatible" % file_extension[1:]
97 )
98 # if file input has no path, add cwd because execution is launched on tmp folder
99 if PurePath(path).name == path or not PurePath(path).is_absolute():
100 path = str(PurePath(Path.cwd()).joinpath(path))
101 return path
104def check_traj_path(path, out_log, classname):
105 """Checks input structure file"""
106 if not Path(path).exists():
107 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log)
108 raise SystemExit(classname + ": Unexisting trajectory input file")
109 file_extension = PurePath(path).suffix
110 if not is_valid_trajectory(file_extension[1:]):
111 fu.log(
112 classname + ": Format %s in trajectory input file is not compatible"
113 % file_extension[1:],
114 out_log,
115 )
116 raise SystemExit(
117 classname + ": Format %s in trajectory input file is not compatible"
118 % file_extension[1:]
119 )
120 # if file input has no path, add cwd because execution is launched on tmp folder
121 if PurePath(path).name == path or not PurePath(path).is_absolute():
122 path = str(PurePath(Path.cwd()).joinpath(path))
123 return path
126def check_out_xvg_path(path, out_log, classname):
127 """Checks if output folder exists and format is xvg"""
128 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
129 fu.log(classname + ": Unexisting output folder, exiting", out_log)
130 raise SystemExit(classname + ": Unexisting output folder")
131 file_extension = PurePath(path).suffix
132 if not is_valid_xvg(file_extension[1:]):
133 fu.log(
134 classname + ": Format %s in output file is not compatible" % file_extension[1:],
135 out_log,
136 )
137 raise SystemExit(
138 classname + ": Format %s in output file is not compatible" % file_extension[1:]
139 )
140 return path
143def check_out_pdb_path(path, out_log, classname):
144 """Checks if output folder exists and format is xvg"""
145 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
146 fu.log(classname + ": Unexisting output folder, exiting", out_log)
147 raise SystemExit(classname + ": Unexisting output folder")
148 file_extension = PurePath(path).suffix
149 if not is_valid_structure(file_extension[1:]):
150 fu.log(
151 classname + ": Format %s in output file is not compatible" % file_extension[1:],
152 out_log,
153 )
154 raise SystemExit(
155 classname + ": Format %s in output file is not compatible" % file_extension[1:]
156 )
157 return path
160def check_out_traj_path(path, out_log, classname):
161 """Checks if output folder exists and format is correct"""
162 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
163 fu.log(classname + ": Unexisting output folder, exiting", out_log)
164 raise SystemExit(classname + ": Unexisting output folder")
165 file_extension = PurePath(path).suffix
166 if not is_valid_trajectory_output(file_extension[1:]):
167 fu.log(
168 classname + ": Format %s in output file is not compatible" % file_extension[1:],
169 out_log,
170 )
171 raise SystemExit(
172 classname + ": Format %s in output file is not compatible" % file_extension[1:]
173 )
174 return path
177def check_out_str_ens_path(path, out_log, classname):
178 """Checks if output folder exists and format is correct"""
179 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
180 fu.log(classname + ": Unexisting output folder, exiting", out_log)
181 raise SystemExit(classname + ": Unexisting output folder")
182 file_extension = PurePath(path).suffix
183 if not is_valid_zip(file_extension[1:]):
184 fu.log(
185 classname + ": Format %s in output file is not compatible" % file_extension[1:],
186 out_log,
187 )
188 raise SystemExit(
189 classname + ": Format %s in output file is not compatible" % file_extension[1:]
190 )
191 return path
194def get_default_value(key):
195 """Gives default values according to the given key"""
197 default_values = {
198 "instructions_file": "instructions.in",
199 "binary_path": "gmx",
200 "terms": ["Potential"],
201 "selection": "System",
202 "xvg": "none",
203 "dista": False,
204 "method": "linkage",
205 "cutoff": 0.1,
206 "cluster_selection": "System",
207 "fit_selection": "System",
208 "center_selection": "System",
209 "output_selection": "System",
210 "pbc": "mol",
211 "center": True,
212 "fit": "none",
213 "ur": "compact",
214 "skip": 1,
215 "start": 0,
216 "end": 0,
217 "dt": 0,
218 "ot_str_ens": "pdb",
219 }
221 return default_values[key]
224def get_binary_path(properties, type):
225 """Gets binary path"""
226 return properties.get(type, get_default_value(type))
229def get_terms(properties, out_log, classname):
230 """Gets energy terms"""
231 terms = properties.get("terms", dict())
232 if not terms or not isinstance(terms, list):
233 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log)
234 raise SystemExit(classname + ": No terms provided or incorrect format")
235 if not is_valid_term(terms):
236 fu.log(classname + ": Incorrect terms provided, exiting", out_log)
237 raise SystemExit(classname + ": Incorrect terms provided")
238 return properties.get("terms", "")
241def get_selection(properties, out_log, classname):
242 """Gets selection items"""
243 selection = properties.get("selection", get_default_value("selection"))
244 if not selection:
245 fu.log(
246 classname + ": No selection provided or incorrect format, exiting", out_log
247 )
248 raise SystemExit(classname + ": No selection provided or incorrect format")
249 if not is_valid_selection(selection):
250 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
251 raise SystemExit(classname + ": Incorrect selection provided")
252 return selection
255def get_image_selection(properties, key, out_log, classname):
256 """Gets selection items"""
257 selection = properties.get(key, get_default_value(key))
258 if not selection:
259 fu.log(
260 classname + ": No selection provided or incorrect format, exiting", out_log
261 )
262 raise SystemExit(classname + ": No selection provided or incorrect format")
263 if not is_valid_selection(selection):
264 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
265 raise SystemExit(classname + ": Incorrect selection provided")
266 return selection
269def get_selection_index_file(properties, index, key, out_log, classname):
270 """Gets selection items from provided index file"""
271 pattern = re.compile(r"\[.*\]")
272 selection = []
273 with open(index, "r") as ndx_file:
274 for i, line in enumerate(ndx_file):
275 for match in re.finditer(pattern, line):
276 selection.append(re.sub(r"[\[\] ]", "", match.group()))
277 sel = properties.get(key, get_default_value(key))
278 if sel not in selection:
279 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
280 raise SystemExit(classname + ": Incorrect selection provided")
281 return sel
284def get_pbc(properties, out_log, classname):
285 """Gets pbc"""
286 pbc = properties.get("pbc", get_default_value("pbc"))
287 if not is_valid_pbc(pbc):
288 fu.log(classname + ": Incorrect pbc provided, exiting", out_log)
289 raise SystemExit(classname + ": Incorrect pbc provided")
290 return pbc
293def get_center(properties, out_log, classname):
294 """Gets center"""
295 center = properties.get("center", get_default_value("center"))
296 if not is_valid_boolean(center):
297 fu.log(classname + ": Incorrect center provided, exiting", out_log)
298 raise SystemExit(classname + ": Incorrect center provided")
299 return center
302def get_ur(properties, out_log, classname):
303 """Gets ur"""
304 ur = properties.get("ur", get_default_value("ur"))
305 if not is_valid_ur(ur):
306 fu.log(classname + ": Incorrect ur provided, exiting", out_log)
307 raise SystemExit(classname + ": Incorrect ur provided")
308 return ur
311def get_fit(properties, out_log, classname):
312 """Gets fit"""
313 fit = properties.get("fit", get_default_value("fit"))
314 if not is_valid_fit(fit):
315 fu.log(classname + ": Incorrect fit provided, exiting", out_log)
316 raise SystemExit(classname + ": Incorrect fit provided")
317 return fit
320def get_skip(properties, out_log, classname):
321 """Gets skip"""
322 skip = properties.get("skip", get_default_value("skip"))
323 if not is_valid_int(skip):
324 fu.log(classname + ": Incorrect skip provided, exiting", out_log)
325 raise SystemExit(classname + ": Incorrect start provided")
326 return str(skip)
329def get_start(properties, out_log, classname):
330 """Gets start"""
331 start = properties.get("start", get_default_value("start"))
332 if not is_valid_int(start):
333 fu.log(classname + ": Incorrect start provided, exiting", out_log)
334 raise SystemExit(classname + ": Incorrect start provided")
335 return str(start)
338def get_end(properties, out_log, classname):
339 """Gets end"""
340 end = properties.get("end", get_default_value("end"))
341 if not is_valid_int(end):
342 fu.log(classname + ": Incorrect end provided, exiting", out_log)
343 raise SystemExit(classname + ": Incorrect end provided")
344 return str(end)
347def get_dt(properties, out_log, classname):
348 """Gets dt"""
349 dt = properties.get("dt", get_default_value("dt"))
350 if not is_valid_int(dt):
351 fu.log(classname + ": Incorrect dt provided, exiting", out_log)
352 raise SystemExit(classname + ": Incorrect dt provided")
353 return str(dt)
356def get_ot_str_ens(properties, out_log, classname):
357 """Gets output type"""
358 output_type = properties.get("output_type", get_default_value("ot_str_ens"))
359 if not is_valid_ot_str_ens(output_type):
360 fu.log(classname + ": Incorrect output_type provided, exiting", out_log)
361 raise SystemExit(classname + ": Incorrect output_type provided")
362 return str(output_type)
365def get_xvg(properties, out_log, classname):
366 """Gets xvg"""
367 xvg = properties.get("xvg", get_default_value("xvg"))
368 if not is_valid_xvg_param(xvg):
369 fu.log(classname + ": Incorrect xvg provided, exiting", out_log)
370 raise SystemExit(classname + ": Incorrect xvg provided")
371 return xvg
374def get_dista(properties, out_log, classname):
375 """Gets dista"""
376 dista = properties.get("dista", get_default_value("dista"))
377 if not is_valid_boolean(dista):
378 fu.log(classname + ": Incorrect dista provided, exiting", out_log)
379 raise SystemExit(classname + ": Incorrect dista provided")
380 return dista
383def get_method(properties, out_log, classname):
384 """Gets method"""
385 method = properties.get("method", get_default_value("method"))
386 if not is_valid_method_param(method):
387 fu.log(classname + ": Incorrect method provided, exiting", out_log)
388 raise SystemExit(classname + ": Incorrect method provided")
389 return method
392def get_cutoff(properties, out_log, classname):
393 """Gets cutoff"""
394 cutoff = properties.get("cutoff", get_default_value("cutoff"))
395 if not is_valid_float(cutoff):
396 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log)
397 raise SystemExit(classname + ": Incorrect cutoff provided")
398 return str(cutoff)
401def is_valid_boolean(val):
402 """Checks if given value is boolean"""
403 values = [True, False]
404 return val in values
407def is_valid_float(val):
408 """Checks if given value is float"""
409 if val and not isinstance(val, float) and not isinstance(val, int):
410 return False
411 return True
414def is_valid_int(val):
415 """Checks if given value is int"""
416 if val and not isinstance(val, int):
417 return False
418 return True
421def is_valid_method_param(met):
422 """Checks if method is compatible with GROMACS"""
423 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"]
424 return met in methods
427def is_valid_structure(ext):
428 """Checks if structure format is compatible with GROMACS"""
429 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"]
430 return ext in formats
433def is_valid_index(ext):
434 """Checks if structure format is compatible with GROMACS"""
435 formats = ["ndx"]
436 return ext in formats
439def is_valid_trajectory(ext):
440 """Checks if trajectory format is compatible with GROMACS"""
441 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"]
442 return ext in formats
445def is_valid_trajectory_output(ext):
446 """Checks if trajectory format is compatible with GROMACS"""
447 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"]
448 return ext in formats
451def is_valid_energy(ext):
452 """Checks if energy format is compatible with GROMACS"""
453 formats = ["edr"]
454 return ext in formats
457def is_valid_xvg(ext):
458 """Checks if file is XVG"""
459 formats = ["xvg"]
460 return ext in formats
463def is_valid_zip(ext):
464 """Checks if file is ZIP"""
465 formats = ["zip"]
466 return ext in formats
469def is_valid_xvg_param(ext):
470 """Checks xvg parameter"""
471 formats = ["xmgrace", "xmgr", "none"]
472 return ext in formats
475def is_valid_ot_str_ens(ext):
476 """Checks if output type for structure ensemble is correct"""
477 formats = ["gro", "g96", "pdb"]
478 return ext in formats
481def is_valid_pbc(pbc):
482 """Checks pbc parameter"""
483 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"]
484 return pbc in values
487def is_valid_ur(ur):
488 """Checks ur parameter"""
489 values = ["rect", "tric", "compact"]
490 return ur in values
493def is_valid_fit(fit):
494 """Checks fit parameter"""
495 values = [
496 "none",
497 "rot+trans",
498 "rotxy+transxy",
499 "translation",
500 "transxy",
501 "progressive",
502 ]
503 return fit in values
506def is_valid_term(iterms):
507 """Checks if term is correct"""
508 cterms = [
509 "Angle",
510 "Proper-Dih.",
511 "Improper-Dih.",
512 "LJ-14",
513 "Coulomb-14",
514 "LJ-(SR)",
515 "Coulomb-(SR)",
516 "Coul.-recip.",
517 "Position-Rest.",
518 "Potential",
519 "Kinetic-En.",
520 "Total-Energy",
521 "Temperature",
522 "Pressure",
523 " Constr.-rmsd",
524 "Box-X",
525 "Box-Y",
526 " Box-Z",
527 "Volume",
528 "Density",
529 "pV",
530 "Enthalpy",
531 "Vir-XX",
532 "Vir-XY",
533 "Vir-XZ",
534 "Vir-YX",
535 "Vir-YY",
536 "Vir-YZ",
537 "Vir-ZX",
538 "Vir-ZY",
539 "Vir-ZZ",
540 "Pres-XX",
541 "Pres-XY",
542 "Pres-XZ",
543 "Pres-YX",
544 "Pres-YY",
545 "Pres-YZ",
546 "Pres-ZX",
547 "Pres-ZY",
548 "Pres-ZZ",
549 "#Surf*SurfTen",
550 "Box-Vel-XX",
551 "Box-Vel-YY",
552 "Box-Vel-ZZ",
553 "Mu-X",
554 "Mu-Y",
555 "Mu-Z",
556 "T-Protein",
557 "T-non-Protein",
558 "Lamb-Protein",
559 "Lamb-non-Protein",
560 ]
561 return all(elem in cterms for elem in iterms)
564def is_valid_selection(ext):
565 """Checks if selection is correct"""
566 formats = [
567 "System",
568 "Protein",
569 "Protein-H",
570 "C-alpha",
571 "Backbone",
572 "MainChain",
573 "MainChain+Cb",
574 "MainChain+H",
575 "SideChain",
576 "SideChain-H",
577 "Prot-Masses",
578 "non-Protein",
579 "Water",
580 "SOL",
581 "non-Water",
582 "Ion",
583 "NA",
584 "CL",
585 "Water_and_ions",
586 "DNA",
587 "RNA",
588 "Protein_DNA",
589 "Protein_RNA",
590 "Protein_DNA_RNA",
591 "DNA_RNA",
592 "DPPC",
593 "DMPC",
594 "POPG",
595 "POPA",
596 "POPC",
597 "POPE",
598 "DMTAP",
599 "POPS"
600 ]
601 return ext in formats
604def copy_instructions_file_to_container(instructions_file, unique_dir):
605 shutil.copy2(instructions_file, unique_dir)
608def remove_tmp_files(list, remove_tmp, out_log):
609 """Removes temporal files generated by the wrapper"""
610 if remove_tmp:
611 tmp_files = list
612 removed_files = [f for f in tmp_files if fu.rm(f)]
613 fu.log("Removed: %s" % str(removed_files), out_log)
616def process_output_trjconv_str_ens(
617 tmp_folder, output_file, output_dir, glob_pattern, out_log
618):
619 tmp_fl = list(Path(tmp_folder).glob(glob_pattern))
620 if not tmp_fl:
621 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb"))
623 files_list = []
624 for file_name in tmp_fl:
625 files_list.append(file_name)
627 # adding files from temporary folder to zip
628 fu.zip_list(output_file, files_list, out_log)
630 shutil.copy2(output_file, output_dir)
633def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]:
634 """
635 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is.
636 Returns an empty list if input_data is None.
638 Parameters:
639 input_data (str, list, or None): The string, list, or None value to convert.
641 Returns:
642 list: A list of string elements or an empty list if input_data is None.
643 """
644 if input_data is None:
645 return []
647 if isinstance(input_data, list):
648 # If input is already a list, return it
649 return input_data
651 # If input is a string, determine the delimiter based on presence of commas
652 delimiter = "," if "," in input_data else " "
653 items = input_data.split(delimiter)
655 # Remove whitespace from each item and ignore empty strings
656 processed_items = [item.strip() for item in items if item.strip()]
658 return processed_items