Coverage for biobb_haddock/haddock/common.py: 95%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-07 08:48 +0000

1"""Common functions for package biobb_haddock.haddock""" 

2 

3import logging 

4from typing import Any, Optional 

5 

6import biobb_common.tools.file_utils as fu 

7from .haddock3_config import load, save 

8 

9haddock_2_wf = { 

10 'ambig_fname': 'ambig_restraints_table_path', 

11 'unambig_fname': 'unambig_restraints_table_path', 

12 'hbond_fname': 'hb_restraints_table_path', 

13} 

14 

15 

16def create_cfg( 

17 output_cfg_path: str, 

18 workflow_dict: dict[str, Any], 

19 input_cfg_path: Optional[str] = None, 

20 cfg_properties_dict: Optional[dict[str, str]] = None, 

21 local_log: Optional[logging.Logger] = None, 

22 global_log: Optional[logging.Logger] = None, 

23) -> str: 

24 """Creates an CFG file using the following hierarchy cfg_properties_dict > input_cfg_path > preset_dict""" 

25 cfg_dict: dict[str, Any] = {} 

26 

27 # Handle input configuration if it exists 

28 if input_cfg_path: 

29 input_cfg = load(input_cfg_path) 

30 print(f"Input CFG: {input_cfg}") 

31 cfg_dict = input_cfg.copy() # Start with entire loaded config as base 

32 

33 # Apply single step configuration if specified 

34 if haddock_step_name := workflow_dict.get("haddock_step_name"): 

35 # Get preset properties for this step if any 

36 step_preset = cfg_preset(haddock_step_name) 

37 

38 # Create or update the step configuration 

39 if not cfg_dict: 

40 # No input config, create new structure with single step 

41 target_key = haddock_step_name 

42 cfg_dict = {target_key: step_preset or {}} 

43 else: 

44 # Update the specific step in the existing config 

45 target_key = f"{haddock_step_name}.1" 

46 if target_key not in cfg_dict: 

47 cfg_dict[target_key] = {} 

48 # Merge preset values while preserving existing values 

49 if step_preset: 

50 for k, v in step_preset.items(): 

51 if k not in cfg_dict[target_key]: # Only add if not already defined 

52 cfg_dict[target_key][k] = v 

53 

54 # Apply custom properties to the step 

55 if cfg_properties_dict: 

56 for k, v in cfg_properties_dict.items(): 

57 fu.log(f"CFG: {k} = {v}", local_log, global_log) 

58 cfg_dict[target_key][k] = v 

59 else: 

60 # Multiple steps: haddock3_run and haddock3_extend 

61 if cfg_properties_dict: 

62 for key, value in cfg_properties_dict.items(): 

63 if isinstance(value, dict): 

64 # If the value is a dictionary, update the corresponding section in cfg_dict 

65 if key not in cfg_dict: 

66 cfg_dict[key] = {} 

67 for sub_key, sub_value in value.items(): 

68 fu.log(f"CFG: {key}.{sub_key} = {sub_value}", local_log, global_log) 

69 cfg_dict[key][sub_key] = sub_value 

70 else: 

71 # If the value is not a dictionary, treat it as a top-level property 

72 fu.log(f"CFG: {key} = {value}", local_log, global_log) 

73 cfg_dict[key] = value 

74 # Add workflow_dict properties to cfg_dict 

75 for key, value in cfg_dict.items(): 

76 if isinstance(value, dict): 

77 for sub_key, sub_value in value.items(): 

78 mapped_key = haddock_2_wf.get(sub_key) 

79 if mapped_key and mapped_key in workflow_dict: 

80 sub_value = workflow_dict[mapped_key] 

81 cfg_dict[key][sub_key] = sub_value 

82 

83 # Add molecules and run_dir if provided 

84 for key, value in workflow_dict.items(): 

85 if key == 'haddock_step_name' or key in haddock_2_wf.values(): 

86 continue 

87 cfg_dict[key] = value 

88 

89 # Use haddock save 

90 save(cfg_dict, output_cfg_path) 

91 

92 return output_cfg_path 

93 

94 

95def cfg_preset(haddock_step_name: str) -> dict[str, Any]: 

96 cfg_dict: dict[str, Any] = {} 

97 # cfg_dict["debug"] = True 

98 

99 if haddock_step_name == "topoaa": 

100 cfg_dict["autohis"] = True 

101 cfg_dict["delenph"] = True 

102 cfg_dict["log_level"] = "quiet" 

103 cfg_dict["iniseed"] = 917 

104 cfg_dict["ligand_param_fname"] = "" 

105 cfg_dict["ligand_top_fname"] = "" 

106 cfg_dict["limit"] = True 

107 cfg_dict["tolerance"] = 0 

108 

109 elif haddock_step_name == "rigidbody": 

110 cfg_dict["sampling"] = 20 

111 cfg_dict["tolerance"] = 20 

112 

113 elif haddock_step_name == "seletop": 

114 cfg_dict["select"] = 5 

115 

116 elif haddock_step_name == "flexref": 

117 cfg_dict["tolerance"] = 20 

118 

119 elif haddock_step_name == "emref": 

120 cfg_dict["tolerance"] = 20 

121 

122 return cfg_dict 

123 

124 

125def unzip_workflow_data(zip_file: str, out_log: Optional[logging.Logger] = None) -> str: 

126 """Extract all files in the zip_file and return the directory. 

127 

128 Args: 

129 zip_file (str): Input topology zipball file path. 

130 out_log (:obj:`logging.Logger`): Input log object. 

131 

132 Returns: 

133 str: Path to the extracted directory. 

134 

135 """ 

136 extract_dir = fu.create_unique_dir() 

137 zip_list = fu.unzip_list(zip_file, extract_dir, out_log) 

138 if out_log: 

139 out_log.info("Unzipping: ") 

140 out_log.info(zip_file) 

141 out_log.info("To: ") 

142 for file_name in zip_list: 

143 out_log.info(file_name) 

144 return extract_dir