Coverage for biobb_haddock/haddock/common.py: 81%

144 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-03 15:55 +0000

1"""Common functions for package biobb_haddock.haddock""" 

2 

3import shutil 

4import logging 

5import os 

6import jsonpickle 

7from pathlib import Path 

8from typing import Any, Optional 

9import biobb_common.tools.file_utils as fu 

10from biobb_common.generic.biobb_object import BiobbObject 

11from biobb_common.tools.file_utils import launchlogger 

12from .haddock3_config import load, save 

13 

14haddock_2_wf = { 

15 'ambig_fname': 'ambig_restraints_table_path', 

16 'unambig_fname': 'unambig_restraints_table_path', 

17 'hbond_fname': 'hb_restraints_table_path', 

18} 

19 

20 

21class HaddockStepBase(BiobbObject): 

22 """Base class for HADDOCK3 step modules with shared launch functionality.""" 

23 

24 def copy_step_output( 

25 self, filter_funct: callable, 

26 output_zip_path: str, sele_top: bool = False) -> None: 

27 """Copy the output files from the run directory to the output zip path. 

28 

29 Args: 

30 obj: The object containing the output paths. 

31 run_dir (str): The directory where the output files are located. 

32 filter_funct (callable): A function that accepts a Path and returns True for the files to be copied. 

33 output_zip_path (str): The path where the output zip file will be created.""" 

34 # Find the directories with the haddock step name 

35 haddock_output_list = [ 

36 str(path) 

37 for path in Path(self.run_dir).iterdir() 

38 if path.is_dir() and str(path).endswith(self.haddock_step_name) 

39 ] 

40 # Make the one with the highest step number the first one 

41 haddock_output_list.sort(reverse=True) 

42 # Select files with filter_funct 

43 output_file_list = [ 

44 str(path) 

45 for path in Path(haddock_output_list[0]).iterdir() 

46 if path.is_file() and filter_funct(path) 

47 ] 

48 if sele_top: 

49 with open(haddock_output_list[0]+'/io.json') as json_file: 

50 content = jsonpickle.decode(json_file.read()) 

51 output = content["output"] 

52 for file in output: 

53 rel_path = str(file.rel_path).split('/') 

54 output_file_list.extend(list(Path(self.run_dir+'/'+rel_path[-2]).glob(rel_path[-1]+'*'))) 

55 if len(output_file_list) == 0: 

56 fu.log("No output files found matching the criteria.", self.out_log, self.global_log) 

57 else: 

58 fu.zip_list(output_zip_path, output_file_list, self.out_log) 

59 

60 @launchlogger 

61 def launch(self) -> int: 

62 """Execute the HADDOCK step with common workflow.""" 

63 # Setup Biobb 

64 if self.check_restart(): 

65 return 0 

66 self.stage_files() 

67 

68 if self.stage_io_dict["in"]["input_haddock_wf_data"][-4:] == ".zip": 

69 # Unzip workflow data to workflow_data_out 

70 new_input = fu.create_unique_dir(self.stage_io_dict["unique_dir"], '_input_unzipped') 

71 fu.unzip_list(self.stage_io_dict["in"]["input_haddock_wf_data"], new_input) # , self.out_log) 

72 self.stage_io_dict["in"]["input_haddock_wf_data"] = new_input 

73 

74 self.run_dir = self.stage_io_dict["out"]["output_haddock_wf_data"] 

75 if self.stage_io_dict["in"]["input_haddock_wf_data"] != self.run_dir: 

76 # Different I/O folder 

77 shutil.copytree(self.stage_io_dict["in"]["input_haddock_wf_data"], 

78 self.run_dir, dirs_exist_ok=True) 

79 else: 

80 # Same I/O folder 

81 os.rename(self.stage_io_dict["in"]["input_haddock_wf_data"], self.run_dir) 

82 # Check if there are more than 9 numbered folders and rename them to add leading zeros 

83 input_wf = self.io_dict["in"]["input_haddock_wf_data"] 

84 numbered_dirs = [] 

85 for item in os.listdir(input_wf): 

86 item_path = os.path.join(input_wf, item) 

87 if os.path.isdir(item_path) and item[0].isdigit(): 

88 numbered_dirs.append(item) 

89 

90 if len(numbered_dirs) in [10, 100, 1000] and not self.disable_sandbox: 

91 for dirname in numbered_dirs: 

92 os.rename(os.path.join(input_wf, dirname), 

93 os.path.join(input_wf, f"0{dirname}")) 

94 

95 workflow_dict = {"haddock_step_name": self.haddock_step_name} 

96 workflow_dict.update(self.global_cfg) 

97 

98 if hasattr(self, '_handle_config_arguments'): 

99 self._handle_config_arguments() 

100 

101 # Create workflow configuration 

102 self.output_cfg_path = create_cfg( 

103 output_cfg_path=self.create_tmp_file('_haddock.cfg'), 

104 workflow_dict=workflow_dict, 

105 input_cfg_path=self.stage_io_dict["in"].get("haddock_config_path"), 

106 cfg_properties_dict=self.cfg, 

107 out_log=self.out_log, 

108 global_log=self.global_log, 

109 ) 

110 

111 if self.container_path: 

112 fu.log("Container execution enabled", self.out_log) 

113 move_to_container_path(self, self.run_dir) 

114 

115 self.cmd = [self.binary_path, self.output_cfg_path, "--extend-run", os.path.abspath(self.run_dir)] 

116 

117 # Run Biobb block 

118 with fu.change_dir(self.run_dir): 

119 self.run_biobb() 

120 

121 # Copy files to host 

122 if hasattr(self, '_handle_step_output'): 

123 self._handle_step_output() 

124 if self.io_dict["out"]["output_haddock_wf_data"][-4:] == ".zip": 

125 zip_wf_output(self) 

126 else: 

127 self.copy_to_host() 

128 

129 # Remove temporal files 

130 self.remove_tmp_files() 

131 

132 return self.return_code 

133 

134 

135def create_cfg( 

136 output_cfg_path: str, 

137 workflow_dict: dict[str, Any], 

138 input_cfg_path: Optional[str] = None, 

139 cfg_properties_dict: Optional[dict[str, str]] = None, 

140 out_log: Optional[logging.Logger] = None, 

141 global_log: Optional[logging.Logger] = None, 

142) -> str: 

143 """Creates an CFG file using the following hierarchy cfg_properties_dict > input_cfg_path > preset_dict""" 

144 cfg_dict: dict[str, Any] = {} 

145 

146 # Handle input configuration if it exists 

147 if input_cfg_path: 

148 input_cfg = load(input_cfg_path) 

149 print(f"Input CFG: {input_cfg}") 

150 cfg_dict = input_cfg.copy() # Start with entire loaded config as base 

151 

152 # Apply single step configuration if specified 

153 haddock_step_name: str = workflow_dict["haddock_step_name"] 

154 if not haddock_step_name.startswith("haddock3_"): 

155 # Get preset properties for this step if any 

156 step_preset = cfg_preset(haddock_step_name) 

157 

158 # Create or update the step configuration 

159 if not cfg_dict: 

160 # No input config, create new structure with single step 

161 target_key = haddock_step_name 

162 cfg_dict = {target_key: step_preset or {}} 

163 else: 

164 # Update the specific step in the existing config 

165 target_key = f"{haddock_step_name}.1" 

166 if target_key not in cfg_dict: 

167 cfg_dict[target_key] = {} 

168 # Merge preset values while preserving existing values 

169 if step_preset: 

170 for k, v in step_preset.items(): 

171 if k not in cfg_dict[target_key]: # Only add if not already defined 

172 cfg_dict[target_key][k] = v 

173 

174 # Apply custom properties to the step 

175 if cfg_properties_dict: 

176 for k, v in cfg_properties_dict.items(): 

177 fu.log(f"CFG from properties: {k} = {v}", out_log, global_log) 

178 cfg_dict[target_key][k] = v 

179 # Multiple steps: haddock3_run and haddock3_extend 

180 else: 

181 if cfg_properties_dict: 

182 for key, value in cfg_properties_dict.items(): 

183 if isinstance(value, dict): 

184 # If the value is a dictionary, update the corresponding section in cfg_dict 

185 if key not in cfg_dict: 

186 cfg_dict[key] = {} 

187 for sub_key, sub_value in value.items(): 

188 fu.log(f"CFG: {key}.{sub_key} = {sub_value}", out_log, global_log) 

189 cfg_dict[key][sub_key] = sub_value 

190 else: 

191 # If the value is not a dictionary, treat it as a top-level property 

192 fu.log(f"CFG: {key} = {value}", out_log, global_log) 

193 cfg_dict[key] = value 

194 # Add workflow_dict properties to cfg_dict 

195 for key, value in cfg_dict.items(): 

196 if isinstance(value, dict): 

197 for sub_key, sub_value in value.items(): 

198 mapped_key = haddock_2_wf.get(sub_key) 

199 if mapped_key and mapped_key in workflow_dict: 

200 sub_value = workflow_dict[mapped_key] 

201 cfg_dict[key][sub_key] = sub_value 

202 

203 # Add molecules and run_dir if provided 

204 for key, value in workflow_dict.items(): 

205 if key == 'haddock_step_name' or key in haddock_2_wf.values(): 

206 continue 

207 fu.log(f"CFG: {key} = {value}", out_log, global_log) 

208 cfg_dict[key] = value 

209 

210 # Use haddock save 

211 save(cfg_dict, output_cfg_path) 

212 

213 return output_cfg_path 

214 

215 

216def cfg_preset(haddock_step_name: str) -> dict[str, Any]: 

217 cfg_dict: dict[str, Any] = {} 

218 # cfg_dict["debug"] = True 

219 

220 if haddock_step_name == "topoaa": 

221 cfg_dict["autohis"] = True 

222 cfg_dict["delenph"] = True 

223 cfg_dict["log_level"] = "quiet" 

224 cfg_dict["iniseed"] = 917 

225 cfg_dict["ligand_param_fname"] = "" 

226 cfg_dict["ligand_top_fname"] = "" 

227 cfg_dict["limit"] = True 

228 cfg_dict["tolerance"] = 0 

229 

230 elif haddock_step_name == "rigidbody": 

231 cfg_dict["sampling"] = 20 

232 cfg_dict["tolerance"] = 20 

233 

234 elif haddock_step_name == "seletop": 

235 cfg_dict["select"] = 5 

236 

237 elif haddock_step_name == "flexref": 

238 cfg_dict["tolerance"] = 20 

239 

240 elif haddock_step_name == "emref": 

241 cfg_dict["tolerance"] = 20 

242 

243 return cfg_dict 

244 

245 

246def move_to_container_path(obj, run_dir=None): 

247 """Move configuration and run directory to container path.""" 

248 shutil.copy2(obj.output_cfg_path, obj.stage_io_dict.get("unique_dir", "")) 

249 obj.output_cfg_path = str( 

250 Path(obj.container_volume_path).joinpath( 

251 Path(obj.output_cfg_path).name 

252 ) 

253 ) 

254 if run_dir: 

255 shutil.copytree( 

256 run_dir, 

257 str( 

258 Path(obj.stage_io_dict.get("unique_dir", "")).joinpath( 

259 Path(run_dir).name 

260 ) 

261 ), 

262 ) 

263 run_dir = str(Path(obj.stage_io_dict.get("unique_dir", "")).joinpath(Path(run_dir).name)) 

264 

265 

266def zip_wf_output(obj): 

267 """Zip all the files in the run directory and save it to the output path.""" 

268 dest_path = str(Path(obj.io_dict["out"]["output_haddock_wf_data"]).with_suffix('')) 

269 fu.log(f"Zipping {obj.run_dir} to {dest_path} ", obj.out_log, obj.global_log) 

270 shutil.make_archive(dest_path, "zip", obj.run_dir)