⬅ biobb_analysis/gromacs/common.py source

1 """Common functions for package biobb_analysis.gromacs"""
2  
3 import re
4 import shutil
5 from pathlib import Path, PurePath
6 from typing import Optional, Union
7  
8 from biobb_common.command_wrapper import cmd_wrapper
9 from biobb_common.tools import file_utils as fu
10  
11  
12 def gmx_check(file_a: str, file_b: str, gmx: str = "gmx") -> bool:
13 print("Comparing GROMACS files:")
14 print("FILE_A: %s" % str(Path(file_a).resolve()))
15 print("FILE_B: %s" % str(Path(file_b).resolve()))
16 check_result = "check_result.out"
17 cmd = [gmx, "check"]
18 if file_a.endswith(".tpr"):
19 cmd.append("-s1")
20 else:
21 cmd.append("-f")
22 cmd.append(file_a)
23 if file_b.endswith(".tpr"):
24 cmd.append("-s2")
25 else:
26 cmd.append("-f2")
27 cmd.append(file_b)
28 cmd.append("> check_result.out")
29 cmd_wrapper.CmdWrapper(cmd).launch()
30 print("Result file: %s" % str(Path(check_result).resolve()))
31 with open(check_result) as check_file:
32 for line_num, line in enumerate(check_file):
33 if not line.rstrip():
34 continue
35 if line.startswith("Both files read correctly"):
36 continue
37 if not line.startswith("comparing"):
38 print("Discrepance found in line %d: %s" % (line_num, line))
39 return False
40 return True
41  
42  
43 def check_energy_path(path, out_log, classname):
44 """Checks energy input file"""
45 if not Path(path).exists():
46 fu.log(classname + ": Unexisting energy input file, exiting", out_log)
47 raise SystemExit(classname + ": Unexisting energy input file")
48 file_extension = PurePath(path).suffix
49 if not is_valid_energy(file_extension[1:]):
50 fu.log(
51 classname
  • W503 Line break before binary operator
52 + ": Format %s in energy input file is not compatible" % file_extension[1:],
53 out_log,
54 )
55 raise SystemExit(
56 classname
  • W503 Line break before binary operator
57 + ": Format %s in energy input file is not compatible" % file_extension[1:]
58 )
59 # if file input has no path, add cwd because execution is launched on tmp folder
60 if PurePath(path).name == path or not PurePath(path).is_absolute():
61 path = str(PurePath(Path.cwd()).joinpath(path))
62 return path
63  
64  
65 def check_input_path(path, out_log, classname):
66 """Checks input structure file"""
67 if not Path(path).exists():
68 fu.log(classname + ": Unexisting structure input file, exiting", out_log)
69 raise SystemExit(classname + ": Unexisting structure input file")
70 file_extension = PurePath(path).suffix
71 if not is_valid_structure(file_extension[1:]):
72 fu.log(
73 classname
  • W503 Line break before binary operator
74 + ": Format %s in structure input file is not compatible"
75 % file_extension[1:],
76 out_log,
77 )
78 raise SystemExit(
79 classname
  • W503 Line break before binary operator
80 + ": Format %s in structure input file is not compatible"
81 % file_extension[1:]
82 )
83 # if file input has no path, add cwd because execution is launched on tmp folder
84 if PurePath(path).name == path or not PurePath(path).is_absolute():
85 path = str(PurePath(Path.cwd()).joinpath(path))
86 return path
87  
88  
89 def check_index_path(path, out_log, classname):
90 """Checks index input file"""
91 if not path:
92 return None
93 file_extension = PurePath(path).suffix
94 if not is_valid_index(file_extension[1:]):
95 fu.log(
96 classname
  • W503 Line break before binary operator
97 + ": Format %s in index input file is not compatible" % file_extension[1:],
98 out_log,
99 )
100 raise SystemExit(
101 classname
  • W503 Line break before binary operator
102 + ": Format %s in index input file is not compatible" % file_extension[1:]
103 )
104 # if file input has no path, add cwd because execution is launched on tmp folder
105 if PurePath(path).name == path or not PurePath(path).is_absolute():
106 path = str(PurePath(Path.cwd()).joinpath(path))
107 return path
108  
109  
110 def check_traj_path(path, out_log, classname):
111 """Checks input structure file"""
112 if not Path(path).exists():
113 fu.log(classname + ": Unexisting trajectory input file, exiting", out_log)
114 raise SystemExit(classname + ": Unexisting trajectory input file")
115 file_extension = PurePath(path).suffix
116 if not is_valid_trajectory(file_extension[1:]):
117 fu.log(
118 classname
  • W503 Line break before binary operator
119 + ": Format %s in trajectory input file is not compatible"
120 % file_extension[1:],
121 out_log,
122 )
123 raise SystemExit(
124 classname
  • W503 Line break before binary operator
125 + ": Format %s in trajectory input file is not compatible"
126 % file_extension[1:]
127 )
128 # if file input has no path, add cwd because execution is launched on tmp folder
129 if PurePath(path).name == path or not PurePath(path).is_absolute():
130 path = str(PurePath(Path.cwd()).joinpath(path))
131 return path
132  
133  
134 def check_out_xvg_path(path, out_log, classname):
135 """Checks if output folder exists and format is xvg"""
136 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
137 fu.log(classname + ": Unexisting output folder, exiting", out_log)
138 raise SystemExit(classname + ": Unexisting output folder")
139 file_extension = PurePath(path).suffix
140 if not is_valid_xvg(file_extension[1:]):
141 fu.log(
142 classname
  • W503 Line break before binary operator
143 + ": Format %s in output file is not compatible" % file_extension[1:],
144 out_log,
145 )
146 raise SystemExit(
147 classname
  • W503 Line break before binary operator
148 + ": Format %s in output file is not compatible" % file_extension[1:]
149 )
150 return path
151  
152  
153 def check_out_pdb_path(path, out_log, classname):
154 """Checks if output folder exists and format is xvg"""
155 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
156 fu.log(classname + ": Unexisting output folder, exiting", out_log)
157 raise SystemExit(classname + ": Unexisting output folder")
158 file_extension = PurePath(path).suffix
159 if not is_valid_structure(file_extension[1:]):
160 fu.log(
161 classname
  • W503 Line break before binary operator
162 + ": Format %s in output file is not compatible" % file_extension[1:],
163 out_log,
164 )
165 raise SystemExit(
166 classname
  • W503 Line break before binary operator
167 + ": Format %s in output file is not compatible" % file_extension[1:]
168 )
169 return path
170  
171  
172 def check_out_traj_path(path, out_log, classname):
173 """Checks if output folder exists and format is correct"""
174 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
175 fu.log(classname + ": Unexisting output folder, exiting", out_log)
176 raise SystemExit(classname + ": Unexisting output folder")
177 file_extension = PurePath(path).suffix
178 if not is_valid_trajectory_output(file_extension[1:]):
179 fu.log(
180 classname
  • W503 Line break before binary operator
181 + ": Format %s in output file is not compatible" % file_extension[1:],
182 out_log,
183 )
184 raise SystemExit(
185 classname
  • W503 Line break before binary operator
186 + ": Format %s in output file is not compatible" % file_extension[1:]
187 )
188 return path
189  
190  
191 def check_out_str_ens_path(path, out_log, classname):
192 """Checks if output folder exists and format is correct"""
193 if PurePath(path).parent and not Path(PurePath(path).parent).exists():
194 fu.log(classname + ": Unexisting output folder, exiting", out_log)
195 raise SystemExit(classname + ": Unexisting output folder")
196 file_extension = PurePath(path).suffix
197 if not is_valid_zip(file_extension[1:]):
198 fu.log(
199 classname
  • W503 Line break before binary operator
200 + ": Format %s in output file is not compatible" % file_extension[1:],
201 out_log,
202 )
203 raise SystemExit(
204 classname
  • W503 Line break before binary operator
205 + ": Format %s in output file is not compatible" % file_extension[1:]
206 )
207 return path
208  
209  
210 def get_default_value(key):
211 """Gives default values according to the given key"""
212  
213 default_values = {
214 "instructions_file": "instructions.in",
215 "binary_path": "gmx",
216 "terms": ["Potential"],
217 "selection": "System",
218 "xvg": "none",
219 "dista": False,
220 "method": "linkage",
221 "cutoff": 0.1,
222 "cluster_selection": "System",
223 "fit_selection": "System",
224 "center_selection": "System",
225 "output_selection": "System",
226 "pbc": "mol",
227 "center": True,
228 "fit": "none",
229 "ur": "compact",
230 "skip": 1,
231 "start": 0,
232 "end": 0,
233 "dt": 0,
234 "ot_str_ens": "pdb",
235 }
236  
237 return default_values[key]
238  
239  
240 def get_binary_path(properties, type):
241 """Gets binary path"""
242 return properties.get(type, get_default_value(type))
243  
244  
245 def get_terms(properties, out_log, classname):
246 """Gets energy terms"""
247 terms = properties.get("terms", dict())
248 if not terms or not isinstance(terms, list):
249 fu.log(classname + ": No terms provided or incorrect format, exiting", out_log)
250 raise SystemExit(classname + ": No terms provided or incorrect format")
251 if not is_valid_term(terms):
252 fu.log(classname + ": Incorrect terms provided, exiting", out_log)
253 raise SystemExit(classname + ": Incorrect terms provided")
254 return properties.get("terms", "")
255  
256  
257 def get_selection(properties, out_log, classname):
258 """Gets selection items"""
259 selection = properties.get("selection", get_default_value("selection"))
260 if not selection:
261 fu.log(
262 classname + ": No selection provided or incorrect format, exiting", out_log
263 )
264 raise SystemExit(classname + ": No selection provided or incorrect format")
265 if not is_valid_selection(selection):
266 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
267 raise SystemExit(classname + ": Incorrect selection provided")
268 return selection
269  
270  
271 def get_image_selection(properties, key, out_log, classname):
272 """Gets selection items"""
273 selection = properties.get(key, get_default_value(key))
274 if not selection:
275 fu.log(
276 classname + ": No selection provided or incorrect format, exiting", out_log
277 )
278 raise SystemExit(classname + ": No selection provided or incorrect format")
279 if not is_valid_selection(selection):
280 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
281 raise SystemExit(classname + ": Incorrect selection provided")
282 return selection
283  
284  
285 def get_selection_index_file(properties, index, key, out_log, classname):
286 """Gets selection items from provided index file"""
287 pattern = re.compile(r"\[.*\]")
288 selection = []
289 with open(index, "r") as ndx_file:
290 for i, line in enumerate(ndx_file):
291 for match in re.finditer(pattern, line):
292 selection.append(re.sub(r"[\[\] ]", "", match.group()))
293 sel = properties.get(key, get_default_value(key))
294 if sel not in selection:
295 fu.log(classname + ": Incorrect selection provided, exiting", out_log)
296 raise SystemExit(classname + ": Incorrect selection provided")
297 return sel
298  
299  
300 def get_pbc(properties, out_log, classname):
301 """Gets pbc"""
302 pbc = properties.get("pbc", get_default_value("pbc"))
303 if not is_valid_pbc(pbc):
304 fu.log(classname + ": Incorrect pbc provided, exiting", out_log)
305 raise SystemExit(classname + ": Incorrect pbc provided")
306 return pbc
307  
308  
309 def get_center(properties, out_log, classname):
310 """Gets center"""
311 center = properties.get("center", get_default_value("center"))
312 if not is_valid_boolean(center):
313 fu.log(classname + ": Incorrect center provided, exiting", out_log)
314 raise SystemExit(classname + ": Incorrect center provided")
315 return center
316  
317  
318 def get_ur(properties, out_log, classname):
319 """Gets ur"""
320 ur = properties.get("ur", get_default_value("ur"))
321 if not is_valid_ur(ur):
322 fu.log(classname + ": Incorrect ur provided, exiting", out_log)
323 raise SystemExit(classname + ": Incorrect ur provided")
324 return ur
325  
326  
327 def get_fit(properties, out_log, classname):
328 """Gets fit"""
329 fit = properties.get("fit", get_default_value("fit"))
330 if not is_valid_fit(fit):
331 fu.log(classname + ": Incorrect fit provided, exiting", out_log)
332 raise SystemExit(classname + ": Incorrect fit provided")
333 return fit
334  
335  
336 def get_skip(properties, out_log, classname):
337 """Gets skip"""
338 skip = properties.get("skip", get_default_value("skip"))
339 if not is_valid_int(skip):
340 fu.log(classname + ": Incorrect skip provided, exiting", out_log)
341 raise SystemExit(classname + ": Incorrect start provided")
342 return str(skip)
343  
344  
345 def get_start(properties, out_log, classname):
346 """Gets start"""
347 start = properties.get("start", get_default_value("start"))
348 if not is_valid_int(start):
349 fu.log(classname + ": Incorrect start provided, exiting", out_log)
350 raise SystemExit(classname + ": Incorrect start provided")
351 return str(start)
352  
353  
354 def get_end(properties, out_log, classname):
355 """Gets end"""
356 end = properties.get("end", get_default_value("end"))
357 if not is_valid_int(end):
358 fu.log(classname + ": Incorrect end provided, exiting", out_log)
359 raise SystemExit(classname + ": Incorrect end provided")
360 return str(end)
361  
362  
363 def get_dt(properties, out_log, classname):
364 """Gets dt"""
365 dt = properties.get("dt", get_default_value("dt"))
366 if not is_valid_int(dt):
367 fu.log(classname + ": Incorrect dt provided, exiting", out_log)
368 raise SystemExit(classname + ": Incorrect dt provided")
369 return str(dt)
370  
371  
372 def get_ot_str_ens(properties, out_log, classname):
373 """Gets output type"""
374 output_type = properties.get("output_type", get_default_value("ot_str_ens"))
375 if not is_valid_ot_str_ens(output_type):
376 fu.log(classname + ": Incorrect output_type provided, exiting", out_log)
377 raise SystemExit(classname + ": Incorrect output_type provided")
378 return str(output_type)
379  
380  
381 def get_xvg(properties, out_log, classname):
382 """Gets xvg"""
383 xvg = properties.get("xvg", get_default_value("xvg"))
384 if not is_valid_xvg_param(xvg):
385 fu.log(classname + ": Incorrect xvg provided, exiting", out_log)
386 raise SystemExit(classname + ": Incorrect xvg provided")
387 return xvg
388  
389  
390 def get_dista(properties, out_log, classname):
391 """Gets dista"""
392 dista = properties.get("dista", get_default_value("dista"))
393 if not is_valid_boolean(dista):
394 fu.log(classname + ": Incorrect dista provided, exiting", out_log)
395 raise SystemExit(classname + ": Incorrect dista provided")
396 return dista
397  
398  
399 def get_method(properties, out_log, classname):
400 """Gets method"""
401 method = properties.get("method", get_default_value("method"))
402 if not is_valid_method_param(method):
403 fu.log(classname + ": Incorrect method provided, exiting", out_log)
404 raise SystemExit(classname + ": Incorrect method provided")
405 return method
406  
407  
408 def get_cutoff(properties, out_log, classname):
409 """Gets cutoff"""
410 cutoff = properties.get("cutoff", get_default_value("cutoff"))
411 if not is_valid_float(cutoff):
412 fu.log(classname + ": Incorrect cutoff provided, exiting", out_log)
413 raise SystemExit(classname + ": Incorrect cutoff provided")
414 return str(cutoff)
415  
416  
417 def is_valid_boolean(val):
418 """Checks if given value is boolean"""
419 values = [True, False]
420 return val in values
421  
422  
423 def is_valid_float(val):
424 """Checks if given value is float"""
425 if val and not isinstance(val, float) and not isinstance(val, int):
426 return False
427 return True
428  
429  
430 def is_valid_int(val):
431 """Checks if given value is int"""
432 if val and not isinstance(val, int):
433 return False
434 return True
435  
436  
437 def is_valid_method_param(met):
438 """Checks if method is compatible with GROMACS"""
439 methods = ["linkage", "jarvis-patrick", "monte-carlo", "diagonalization", "gromos"]
440 return met in methods
441  
442  
443 def is_valid_structure(ext):
444 """Checks if structure format is compatible with GROMACS"""
445 formats = ["tpr", "gro", "g96", "pdb", "brk", "ent"]
446 return ext in formats
447  
448  
449 def is_valid_index(ext):
450 """Checks if structure format is compatible with GROMACS"""
451 formats = ["ndx"]
452 return ext in formats
453  
454  
455 def is_valid_trajectory(ext):
456 """Checks if trajectory format is compatible with GROMACS"""
457 formats = ["xtc", "trr", "cpt", "gro", "g96", "pdb", "tng"]
458 return ext in formats
459  
460  
461 def is_valid_trajectory_output(ext):
462 """Checks if trajectory format is compatible with GROMACS"""
463 formats = ["xtc", "trr", "gro", "g96", "pdb", "tng"]
464 return ext in formats
465  
466  
467 def is_valid_energy(ext):
468 """Checks if energy format is compatible with GROMACS"""
469 formats = ["edr"]
470 return ext in formats
471  
472  
473 def is_valid_xvg(ext):
474 """Checks if file is XVG"""
475 formats = ["xvg"]
476 return ext in formats
477  
478  
479 def is_valid_zip(ext):
480 """Checks if file is ZIP"""
481 formats = ["zip"]
482 return ext in formats
483  
484  
485 def is_valid_xvg_param(ext):
486 """Checks xvg parameter"""
487 formats = ["xmgrace", "xmgr", "none"]
488 return ext in formats
489  
490  
491 def is_valid_ot_str_ens(ext):
492 """Checks if output type for structure ensemble is correct"""
493 formats = ["gro", "g96", "pdb"]
494 return ext in formats
495  
496  
497 def is_valid_pbc(pbc):
498 """Checks pbc parameter"""
499 values = ["none", "mol", "res", "atom", "nojump", "cluster", "whole"]
500 return pbc in values
501  
502  
503 def is_valid_ur(ur):
504 """Checks ur parameter"""
505 values = ["rect", "tric", "compact"]
506 return ur in values
507  
508  
509 def is_valid_fit(fit):
510 """Checks fit parameter"""
511 values = [
512 "none",
513 "rot+trans",
514 "rotxy+transxy",
515 "translation",
516 "transxy",
517 "progressive",
518 ]
519 return fit in values
520  
521  
522 def is_valid_term(iterms):
523 """Checks if term is correct"""
524 cterms = [
525 "Angle",
526 "Proper-Dih.",
527 "Improper-Dih.",
528 "LJ-14",
529 "Coulomb-14",
530 "LJ-(SR)",
531 "Coulomb-(SR)",
532 "Coul.-recip.",
533 "Position-Rest.",
534 "Potential",
535 "Kinetic-En.",
536 "Total-Energy",
537 "Temperature",
538 "Pressure",
539 " Constr.-rmsd",
540 "Box-X",
541 "Box-Y",
542 " Box-Z",
543 "Volume",
544 "Density",
545 "pV",
546 "Enthalpy",
547 "Vir-XX",
548 "Vir-XY",
549 "Vir-XZ",
550 "Vir-YX",
551 "Vir-YY",
552 "Vir-YZ",
553 "Vir-ZX",
554 "Vir-ZY",
555 "Vir-ZZ",
556 "Pres-XX",
557 "Pres-XY",
558 "Pres-XZ",
559 "Pres-YX",
560 "Pres-YY",
561 "Pres-YZ",
562 "Pres-ZX",
563 "Pres-ZY",
564 "Pres-ZZ",
565 "#Surf*SurfTen",
566 "Box-Vel-XX",
567 "Box-Vel-YY",
568 "Box-Vel-ZZ",
569 "Mu-X",
570 "Mu-Y",
571 "Mu-Z",
572 "T-Protein",
573 "T-non-Protein",
574 "Lamb-Protein",
575 "Lamb-non-Protein",
576 ]
577 return all(elem in cterms for elem in iterms)
578  
579  
580 def is_valid_selection(ext):
581 """Checks if selection is correct"""
582 formats = [
583 "System",
584 "Protein",
585 "Protein-H",
586 "C-alpha",
587 "Backbone",
588 "MainChain",
589 "MainChain+Cb",
590 "MainChain+H",
591 "SideChain",
592 "SideChain-H",
593 "Prot-Masses",
594 "non-Protein",
595 "Water",
596 "SOL",
597 "non-Water",
598 "Ion",
599 "NA",
600 "CL",
601 "Water_and_ions",
602 "DNA",
603 "RNA",
604 "Protein_DNA",
605 "Protein_RNA",
606 "Protein_DNA_RNA",
607 "DNA_RNA",
608 ]
609 return ext in formats
610  
611  
612 def copy_instructions_file_to_container(instructions_file, unique_dir):
613 shutil.copy2(instructions_file, unique_dir)
614  
615  
616 def remove_tmp_files(list, remove_tmp, out_log):
617 """Removes temporal files generated by the wrapper"""
618 if remove_tmp:
619 tmp_files = list
620 removed_files = [f for f in tmp_files if fu.rm(f)]
621 fu.log("Removed: %s" % str(removed_files), out_log)
622  
623  
624 def process_output_trjconv_str_ens(
625 tmp_folder, output_file, output_dir, glob_pattern, out_log
626 ):
627 tmp_fl = list(Path(tmp_folder).glob(glob_pattern))
628 if not tmp_fl:
629 tmp_fl = list(Path(tmp_folder).glob("frame*.pdb"))
630  
631 files_list = []
632 for file_name in tmp_fl:
633 files_list.append(file_name)
634  
635 # adding files from temporary folder to zip
636 fu.zip_list(output_file, files_list, out_log)
637  
638 shutil.copy2(output_file, output_dir)
639  
640  
641 def _from_string_to_list(input_data: Optional[Union[str, list[str]]]) -> list[str]:
642 """
643 Converts a string to a list, splitting by commas or spaces. If the input is already a list, returns it as is.
644 Returns an empty list if input_data is None.
645  
646 Parameters:
647 input_data (str, list, or None): The string, list, or None value to convert.
648  
649 Returns:
650 list: A list of string elements or an empty list if input_data is None.
651 """
652 if input_data is None:
653 return []
654  
655 if isinstance(input_data, list):
656 # If input is already a list, return it
657 return input_data
658  
659 # If input is a string, determine the delimiter based on presence of commas
660 delimiter = "," if "," in input_data else " "
661 items = input_data.split(delimiter)
662  
663 # Remove whitespace from each item and ignore empty strings
664 processed_items = [item.strip() for item in items if item.strip()]
665  
666 return processed_items