Coverage for biobb_haddock/haddock/common.py: 81%
144 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 15:55 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-03 15:55 +0000
1"""Common functions for package biobb_haddock.haddock"""
3import shutil
4import logging
5import os
6import jsonpickle
7from pathlib import Path
8from typing import Any, Optional
9import biobb_common.tools.file_utils as fu
10from biobb_common.generic.biobb_object import BiobbObject
11from biobb_common.tools.file_utils import launchlogger
12from .haddock3_config import load, save
14haddock_2_wf = {
15 'ambig_fname': 'ambig_restraints_table_path',
16 'unambig_fname': 'unambig_restraints_table_path',
17 'hbond_fname': 'hb_restraints_table_path',
18}
21class HaddockStepBase(BiobbObject):
22 """Base class for HADDOCK3 step modules with shared launch functionality."""
24 def copy_step_output(
25 self, filter_funct: callable,
26 output_zip_path: str, sele_top: bool = False) -> None:
27 """Copy the output files from the run directory to the output zip path.
29 Args:
30 obj: The object containing the output paths.
31 run_dir (str): The directory where the output files are located.
32 filter_funct (callable): A function that accepts a Path and returns True for the files to be copied.
33 output_zip_path (str): The path where the output zip file will be created."""
34 # Find the directories with the haddock step name
35 haddock_output_list = [
36 str(path)
37 for path in Path(self.run_dir).iterdir()
38 if path.is_dir() and str(path).endswith(self.haddock_step_name)
39 ]
40 # Make the one with the highest step number the first one
41 haddock_output_list.sort(reverse=True)
42 # Select files with filter_funct
43 output_file_list = [
44 str(path)
45 for path in Path(haddock_output_list[0]).iterdir()
46 if path.is_file() and filter_funct(path)
47 ]
48 if sele_top:
49 with open(haddock_output_list[0]+'/io.json') as json_file:
50 content = jsonpickle.decode(json_file.read())
51 output = content["output"]
52 for file in output:
53 rel_path = str(file.rel_path).split('/')
54 output_file_list.extend(list(Path(self.run_dir+'/'+rel_path[-2]).glob(rel_path[-1]+'*')))
55 if len(output_file_list) == 0:
56 fu.log("No output files found matching the criteria.", self.out_log, self.global_log)
57 else:
58 fu.zip_list(output_zip_path, output_file_list, self.out_log)
60 @launchlogger
61 def launch(self) -> int:
62 """Execute the HADDOCK step with common workflow."""
63 # Setup Biobb
64 if self.check_restart():
65 return 0
66 self.stage_files()
68 if self.stage_io_dict["in"]["input_haddock_wf_data"][-4:] == ".zip":
69 # Unzip workflow data to workflow_data_out
70 new_input = fu.create_unique_dir(self.stage_io_dict["unique_dir"], '_input_unzipped')
71 fu.unzip_list(self.stage_io_dict["in"]["input_haddock_wf_data"], new_input) # , self.out_log)
72 self.stage_io_dict["in"]["input_haddock_wf_data"] = new_input
74 self.run_dir = self.stage_io_dict["out"]["output_haddock_wf_data"]
75 if self.stage_io_dict["in"]["input_haddock_wf_data"] != self.run_dir:
76 # Different I/O folder
77 shutil.copytree(self.stage_io_dict["in"]["input_haddock_wf_data"],
78 self.run_dir, dirs_exist_ok=True)
79 else:
80 # Same I/O folder
81 os.rename(self.stage_io_dict["in"]["input_haddock_wf_data"], self.run_dir)
82 # Check if there are more than 9 numbered folders and rename them to add leading zeros
83 input_wf = self.io_dict["in"]["input_haddock_wf_data"]
84 numbered_dirs = []
85 for item in os.listdir(input_wf):
86 item_path = os.path.join(input_wf, item)
87 if os.path.isdir(item_path) and item[0].isdigit():
88 numbered_dirs.append(item)
90 if len(numbered_dirs) in [10, 100, 1000] and not self.disable_sandbox:
91 for dirname in numbered_dirs:
92 os.rename(os.path.join(input_wf, dirname),
93 os.path.join(input_wf, f"0{dirname}"))
95 workflow_dict = {"haddock_step_name": self.haddock_step_name}
96 workflow_dict.update(self.global_cfg)
98 if hasattr(self, '_handle_config_arguments'):
99 self._handle_config_arguments()
101 # Create workflow configuration
102 self.output_cfg_path = create_cfg(
103 output_cfg_path=self.create_tmp_file('_haddock.cfg'),
104 workflow_dict=workflow_dict,
105 input_cfg_path=self.stage_io_dict["in"].get("haddock_config_path"),
106 cfg_properties_dict=self.cfg,
107 out_log=self.out_log,
108 global_log=self.global_log,
109 )
111 if self.container_path:
112 fu.log("Container execution enabled", self.out_log)
113 move_to_container_path(self, self.run_dir)
115 self.cmd = [self.binary_path, self.output_cfg_path, "--extend-run", os.path.abspath(self.run_dir)]
117 # Run Biobb block
118 with fu.change_dir(self.run_dir):
119 self.run_biobb()
121 # Copy files to host
122 if hasattr(self, '_handle_step_output'):
123 self._handle_step_output()
124 if self.io_dict["out"]["output_haddock_wf_data"][-4:] == ".zip":
125 zip_wf_output(self)
126 else:
127 self.copy_to_host()
129 # Remove temporal files
130 self.remove_tmp_files()
132 return self.return_code
135def create_cfg(
136 output_cfg_path: str,
137 workflow_dict: dict[str, Any],
138 input_cfg_path: Optional[str] = None,
139 cfg_properties_dict: Optional[dict[str, str]] = None,
140 out_log: Optional[logging.Logger] = None,
141 global_log: Optional[logging.Logger] = None,
142) -> str:
143 """Creates an CFG file using the following hierarchy cfg_properties_dict > input_cfg_path > preset_dict"""
144 cfg_dict: dict[str, Any] = {}
146 # Handle input configuration if it exists
147 if input_cfg_path:
148 input_cfg = load(input_cfg_path)
149 print(f"Input CFG: {input_cfg}")
150 cfg_dict = input_cfg.copy() # Start with entire loaded config as base
152 # Apply single step configuration if specified
153 haddock_step_name: str = workflow_dict["haddock_step_name"]
154 if not haddock_step_name.startswith("haddock3_"):
155 # Get preset properties for this step if any
156 step_preset = cfg_preset(haddock_step_name)
158 # Create or update the step configuration
159 if not cfg_dict:
160 # No input config, create new structure with single step
161 target_key = haddock_step_name
162 cfg_dict = {target_key: step_preset or {}}
163 else:
164 # Update the specific step in the existing config
165 target_key = f"{haddock_step_name}.1"
166 if target_key not in cfg_dict:
167 cfg_dict[target_key] = {}
168 # Merge preset values while preserving existing values
169 if step_preset:
170 for k, v in step_preset.items():
171 if k not in cfg_dict[target_key]: # Only add if not already defined
172 cfg_dict[target_key][k] = v
174 # Apply custom properties to the step
175 if cfg_properties_dict:
176 for k, v in cfg_properties_dict.items():
177 fu.log(f"CFG from properties: {k} = {v}", out_log, global_log)
178 cfg_dict[target_key][k] = v
179 # Multiple steps: haddock3_run and haddock3_extend
180 else:
181 if cfg_properties_dict:
182 for key, value in cfg_properties_dict.items():
183 if isinstance(value, dict):
184 # If the value is a dictionary, update the corresponding section in cfg_dict
185 if key not in cfg_dict:
186 cfg_dict[key] = {}
187 for sub_key, sub_value in value.items():
188 fu.log(f"CFG: {key}.{sub_key} = {sub_value}", out_log, global_log)
189 cfg_dict[key][sub_key] = sub_value
190 else:
191 # If the value is not a dictionary, treat it as a top-level property
192 fu.log(f"CFG: {key} = {value}", out_log, global_log)
193 cfg_dict[key] = value
194 # Add workflow_dict properties to cfg_dict
195 for key, value in cfg_dict.items():
196 if isinstance(value, dict):
197 for sub_key, sub_value in value.items():
198 mapped_key = haddock_2_wf.get(sub_key)
199 if mapped_key and mapped_key in workflow_dict:
200 sub_value = workflow_dict[mapped_key]
201 cfg_dict[key][sub_key] = sub_value
203 # Add molecules and run_dir if provided
204 for key, value in workflow_dict.items():
205 if key == 'haddock_step_name' or key in haddock_2_wf.values():
206 continue
207 fu.log(f"CFG: {key} = {value}", out_log, global_log)
208 cfg_dict[key] = value
210 # Use haddock save
211 save(cfg_dict, output_cfg_path)
213 return output_cfg_path
216def cfg_preset(haddock_step_name: str) -> dict[str, Any]:
217 cfg_dict: dict[str, Any] = {}
218 # cfg_dict["debug"] = True
220 if haddock_step_name == "topoaa":
221 cfg_dict["autohis"] = True
222 cfg_dict["delenph"] = True
223 cfg_dict["log_level"] = "quiet"
224 cfg_dict["iniseed"] = 917
225 cfg_dict["ligand_param_fname"] = ""
226 cfg_dict["ligand_top_fname"] = ""
227 cfg_dict["limit"] = True
228 cfg_dict["tolerance"] = 0
230 elif haddock_step_name == "rigidbody":
231 cfg_dict["sampling"] = 20
232 cfg_dict["tolerance"] = 20
234 elif haddock_step_name == "seletop":
235 cfg_dict["select"] = 5
237 elif haddock_step_name == "flexref":
238 cfg_dict["tolerance"] = 20
240 elif haddock_step_name == "emref":
241 cfg_dict["tolerance"] = 20
243 return cfg_dict
246def move_to_container_path(obj, run_dir=None):
247 """Move configuration and run directory to container path."""
248 shutil.copy2(obj.output_cfg_path, obj.stage_io_dict.get("unique_dir", ""))
249 obj.output_cfg_path = str(
250 Path(obj.container_volume_path).joinpath(
251 Path(obj.output_cfg_path).name
252 )
253 )
254 if run_dir:
255 shutil.copytree(
256 run_dir,
257 str(
258 Path(obj.stage_io_dict.get("unique_dir", "")).joinpath(
259 Path(run_dir).name
260 )
261 ),
262 )
263 run_dir = str(Path(obj.stage_io_dict.get("unique_dir", "")).joinpath(Path(run_dir).name))
266def zip_wf_output(obj):
267 """Zip all the files in the run directory and save it to the output path."""
268 dest_path = str(Path(obj.io_dict["out"]["output_haddock_wf_data"]).with_suffix(''))
269 fu.log(f"Zipping {obj.run_dir} to {dest_path} ", obj.out_log, obj.global_log)
270 shutil.make_archive(dest_path, "zip", obj.run_dir)